import aiofiles import asyncio import base64 import cv2 import fitz import glob import io import json import logging import math import mistune import os import pandas as pd import pytz import random import re import requests import shutil import streamlit as st import streamlit.components.v1 as components import sys import time import torch import zipfile from audio_recorder_streamlit import audio_recorder from bs4 import BeautifulSoup from collections import deque from contextlib import redirect_stdout from dataclasses import dataclass from datetime import datetime from diffusers import StableDiffusionPipeline from dotenv import load_dotenv from gradio_client import Client, handle_file from huggingface_hub import InferenceClient from io import BytesIO from moviepy import VideoFileClip from openai import OpenAI from PIL import Image from PyPDF2 import PdfReader from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModel from typing import Optional from urllib.parse import quote from xml.etree import ElementTree as ET client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'), organization=os.getenv('OPENAI_ORG_ID')) logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) log_records = [] class LogCaptureHandler(logging.Handler): def emit(self, record): log_records.append(record) logger.addHandler(LogCaptureHandler()) st.set_page_config( page_title="AI Multimodal Titan πŸš€", page_icon="πŸ€–", layout="wide", initial_sidebar_state="expanded", menu_items={ 'Get Help': 'https://huggingface.co/awacke1', 'Report a Bug': 'https://huggingface.co/spaces/awacke1', 'About': "AI Multimodal Titan: PDFs, OCR, Image Gen, Audio/Video, Code Execution, and More! 🌌" } ) for key in ['history', 'messages', 'processing', 'asset_checkboxes', 'downloaded_pdfs', 'unique_counter', 'search_queries']: st.session_state.setdefault(key, [] if key in ['history', 'messages', 'search_queries'] else {} if key in ['asset_checkboxes', 'downloaded_pdfs', 'processing'] else 0 if key == 'unique_counter' else None) st.session_state.setdefault('builder', None) st.session_state.setdefault('model_loaded', False) st.session_state.setdefault('selected_model_type', "Causal LM") st.session_state.setdefault('selected_model', "None") st.session_state.setdefault('gallery_size', 2) st.session_state.setdefault('asset_gallery_container', st.sidebar.empty()) st.session_state.setdefault('cam0_file', None) st.session_state.setdefault('cam1_file', None) st.session_state.setdefault('openai_model', "gpt-4o-2024-05-13") def get_gpu_info(): if torch.cuda.is_available(): gpu_name = torch.cuda.get_device_name(0) total_memory = torch.cuda.get_device_properties(0).total_memory / (1024 ** 3) reserved_memory = torch.cuda.memory_reserved(0) / (1024 ** 3) allocated_memory = torch.cuda.memory_allocated(0) / (1024 ** 3) free_memory = total_memory - allocated_memory utilization = torch.cuda.utilization(0) return { "GPU Name": gpu_name, "Total Memory (GB)": f"{total_memory:.2f}", "Reserved Memory (GB)": f"{reserved_memory:.2f}", "Allocated Memory (GB)": f"{allocated_memory:.2f}", "Free Memory (GB)": f"{free_memory:.2f}", "Utilization (%)": utilization } else: return {"Status": "No GPU detected"} def display_gpu_info(): gpu_info = get_gpu_info() st.sidebar.subheader("GPU Status πŸ“Š") if "Status" in gpu_info and gpu_info["Status"] == "No GPU detected": st.sidebar.warning("No GPU detected. Running on CPU.") else: for key, value in gpu_info.items(): st.sidebar.write(f"{key}: {value}") memory_usage_percent = (float(gpu_info["Allocated Memory (GB)"]) / float(gpu_info["Total Memory (GB)"])) * 100 st.sidebar.progress(min(memory_usage_percent / 100, 1.0)) st.sidebar.caption(f"Memory Usage: {memory_usage_percent:.1f}%") @dataclass class ModelConfig: name: str base_model: str size: str domain: Optional[str] = None model_type: str = "causal_lm" @property def model_path(self): return f"models/{self.name}" @dataclass class DiffusionConfig: name: str base_model: str size: str domain: Optional[str] = None @property def model_path(self): return f"diffusion_models/{self.name}" class ModelBuilder: def __init__(self): self.config = None self.model = None self.tokenizer = None self.jokes = [ "Why did the AI go to therapy? Too many layers to unpack! πŸ˜‚", "Training complete! Time for a binary coffee break. β˜•", "I told my neural network a joke; it couldn’t stop dropping bits! πŸ€–", "I asked the AI for a pun, and it said, 'I’m punning on parallel processing!' πŸ˜„", "Debugging my code is like a stand-up routineβ€”always a series of exceptions! πŸ˜†" ] def load_model(self, model_path: str, config: Optional[ModelConfig] = None): with st.spinner(f"Loading {model_path}... ⏳"): self.model = AutoModelForCausalLM.from_pretrained(model_path) self.tokenizer = AutoTokenizer.from_pretrained(model_path) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token if config: self.config = config device = "cuda" if torch.cuda.is_available() else "cpu" self.model.to(device) st.success(f"Model loaded on {device}! πŸŽ‰ {random.choice(self.jokes)}") return self def save_model(self, path: str): with st.spinner("Saving model... πŸ’Ύ"): os.makedirs(os.path.dirname(path), exist_ok=True) self.model.save_pretrained(path) self.tokenizer.save_pretrained(path) st.success(f"Model saved at {path}! βœ…") class DiffusionBuilder: def __init__(self): self.config = None self.pipeline = None def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): with st.spinner(f"Loading diffusion model {model_path}... ⏳"): self.pipeline = StableDiffusionPipeline.from_pretrained(model_path, torch_dtype=torch.float32).to("cpu") if config: self.config = config st.success("Diffusion model loaded! 🎨") return self def save_model(self, path: str): with st.spinner("Saving diffusion model... πŸ’Ύ"): os.makedirs(os.path.dirname(path), exist_ok=True) self.pipeline.save_pretrained(path) st.success(f"Diffusion model saved at {path}! βœ…") def generate(self, prompt: str): return self.pipeline(prompt, num_inference_steps=20).images[0] def generate_filename(sequence, ext="png", prompt=None): central = pytz.timezone('US/Central') safe_date_time = datetime.now(central).strftime("%m%d_%H%M") if prompt: safe_prompt = re.sub(r'[<>:"/\\|?*\n]', '_', prompt)[:240] return f"{safe_date_time}_{safe_prompt}.{ext}" return f"{sequence}_{time.strftime('%d%m%Y%H%M%S')}.{ext}" def pdf_url_to_filename(url): return re.sub(r'[<>:"/\\|?*]', '_', url) + ".pdf" def get_download_link(file_path, mime_type="application/pdf", label="Download"): with open(file_path, "rb") as f: data = base64.b64encode(f.read()).decode() return f'{label}' def zip_directory(directory_path, zip_path): with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(directory_path): for file in files: zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), os.path.dirname(directory_path))) def get_model_files(model_type="causal_lm"): return [d for d in glob.glob("models/*" if model_type == "causal_lm" else "diffusion_models/*") if os.path.isdir(d)] or ["None"] def get_gallery_files(file_types=["png", "pdf", "md", "wav", "mp4"]): return sorted(list({f for ext in file_types for f in glob.glob(f"*.{ext}")})) def get_pdf_files(): return sorted(glob.glob("*.pdf")) def download_pdf(url, output_path): try: response = requests.get(url, stream=True, timeout=10) if response.status_code == 200: with open(output_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return True except requests.RequestException as e: logger.error(f"Failed to download {url}: {e}") return False async def process_pdf_snapshot(pdf_path, mode="single"): start_time = time.time() status = st.empty() status.text(f"Processing PDF Snapshot ({mode})... (0s)") try: doc = fitz.open(pdf_path) output_files = [] if mode == "single": page = doc[0] pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) output_file = generate_filename("single", "png") pix.save(output_file) output_files.append(output_file) elif mode == "twopage": if len(doc) >= 2: pix1 = doc[0].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) pix2 = doc[1].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) img1 = Image.frombytes("RGB", [pix1.width, pix1.height], pix1.samples) img2 = Image.frombytes("RGB", [pix2.width, pix2.height], pix2.samples) combined_img = Image.new("RGB", (pix1.width + pix2.width, max(pix1.height, pix2.height))) combined_img.paste(img1, (0, 0)) combined_img.paste(img2, (pix1.width, 0)) output_file = generate_filename("twopage", "png") combined_img.save(output_file) output_files.append(output_file) else: page = doc[0] pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) output_file = generate_filename("single", "png") pix.save(output_file) output_files.append(output_file) elif mode == "allpages": for i in range(len(doc)): page = doc[i] pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) output_file = generate_filename(f"page_{i}", "png") pix.save(output_file) output_files.append(output_file) doc.close() elapsed = int(time.time() - start_time) status.text(f"PDF Snapshot ({mode}) completed in {elapsed}s!") return output_files except Exception as e: status.error(f"Failed to process PDF: {str(e)}") return [] async def process_ocr(image, output_file): start_time = time.time() status = st.empty() status.text("Processing GOT-OCR2_0... (0s)") tokenizer = AutoTokenizer.from_pretrained("ucaslcl/GOT-OCR2_0", trust_remote_code=True) model = AutoModel.from_pretrained("ucaslcl/GOT-OCR2_0", trust_remote_code=True, torch_dtype=torch.float32) device = "cuda" if torch.cuda.is_available() else "cpu" model.to(device).eval() temp_file = generate_filename("temp", "png") image.save(temp_file) result = model.chat(tokenizer, temp_file, ocr_type='ocr') os.remove(temp_file) elapsed = int(time.time() - start_time) status.text(f"GOT-OCR2_0 completed in {elapsed}s on {device}!") async with aiofiles.open(output_file, "w") as f: await f.write(result) return result async def process_image_gen(prompt, output_file): start_time = time.time() status = st.empty() status.text("Processing Image Gen... (0s)") pipeline = (st.session_state['builder'].pipeline if st.session_state.get('builder') and isinstance(st.session_state['builder'], DiffusionBuilder) and st.session_state['builder'].pipeline else StableDiffusionPipeline.from_pretrained("OFA-Sys/small-stable-diffusion-v0", torch_dtype=torch.float32).to("cpu")) gen_image = pipeline(prompt, num_inference_steps=20).images[0] elapsed = int(time.time() - start_time) status.text(f"Image Gen completed in {elapsed}s!") gen_image.save(output_file) return gen_image def process_image_with_prompt(image, prompt, model="gpt-4o-mini", detail="auto"): buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") messages = [{"role": "user", "content": [{"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_str}", "detail": detail}}]}] try: response = client.chat.completions.create(model=model, messages=messages, max_tokens=300) return response.choices[0].message.content except Exception as e: return f"Error processing image with GPT: {str(e)}" def process_text_with_prompt(text, prompt, model="gpt-4o-mini"): messages = [{"role": "user", "content": f"{prompt}\n\n{text}"}] try: response = client.chat.completions.create(model=model, messages=messages, max_tokens=300) return response.choices[0].message.content except Exception as e: return f"Error processing text with GPT: {str(e)}" def process_text(text_input): if text_input: st.session_state.messages.append({"role": "user", "content": text_input}) with st.chat_message("user"): st.markdown(text_input) with st.chat_message("assistant"): completion = client.chat.completions.create(model=st.session_state["openai_model"], messages=[{"role": m["role"], "content": m["content"]} for m in st.session_state.messages], stream=False) return_text = completion.choices[0].message.content st.write("Assistant: " + return_text) filename = generate_filename(text_input, "md") with open(filename, "w", encoding="utf-8") as f: f.write(text_input + "\n\n" + return_text) st.session_state.messages.append({"role": "assistant", "content": return_text}) return return_text def process_audio(audio_input, text_input=''): if isinstance(audio_input, str): with open(audio_input, "rb") as file: audio_input = file.read() transcription = client.audio.transcriptions.create(model="whisper-1", file=audio_input) st.session_state.messages.append({"role": "user", "content": transcription.text}) with st.chat_message("assistant"): st.markdown(transcription.text) SpeechSynthesis(transcription.text) filename = generate_filename(transcription.text, "wav") create_audio_file(filename, audio_input, True) filename = generate_filename(transcription.text, "md") with open(filename, "w", encoding="utf-8") as f: f.write(transcription.text + "\n\n" + transcription.text) return transcription.text def process_video(video_path, user_prompt): base64Frames, audio_path = process_video_frames(video_path) with open(video_path, "rb") as file: transcription = client.audio.transcriptions.create(model="whisper-1", file=file) response = client.chat.completions.create( model=st.session_state["openai_model"], messages=[ {"role": "system", "content": "You are generating a video summary. Create a summary of the provided video and its transcript. Respond in Markdown"}, {"role": "user", "content": [ "These are the frames from the video.", *map(lambda x: {"type": "image_url", "image_url": {"url": f'data:image/jpg;base64,{x}', "detail": "low"}}, base64Frames), {"type": "text", "text": f"The audio transcription is: {transcription.text}\n\n{user_prompt}"} ]} ], temperature=0, ) video_response = response.choices[0].message.content filename_md = generate_filename(video_path + '- ' + video_response, "md") with open(filename_md, "w", encoding="utf-8") as f: f.write(video_response) return video_response def process_video_frames(video_path, seconds_per_frame=2): base64Frames = [] base_video_path, _ = os.path.splitext(video_path) base_video_path = video_path video = cv2.VideoCapture(video_path) total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) fps = video.get(cv2.CAP_PROP_FPS) frames_to_skip = int(fps * seconds_per_frame) curr_frame = 0 while curr_frame < total_frames - 1: video.set(cv2.CAP_PROP_POS_FRAMES, curr_frame) success, frame = video.read() if not success: break _, buffer = cv2.imencode(".jpg", frame) base64Frames.append(base64.b64encode(buffer).decode("utf-8")) curr_frame += frames_to_skip video.release() audio_path = f"{base_video_path}.mp3" try: clip = VideoFileClip(video_path) clip.audio.write_audiofile(audio_path, bitrate="32k") clip.audio.close() clip.close() except: logger.info("No audio track found in video.") return base64Frames, audio_path def execute_code(code): buffer = io.StringIO() try: with redirect_stdout(buffer): exec(code, {}, {}) return buffer.getvalue(), None except Exception as e: return None, str(e) finally: buffer.close() def extract_python_code(markdown_text): pattern = r"```python\s*(.*?)\s*```" matches = re.findall(pattern, markdown_text, re.DOTALL) return matches def SpeechSynthesis(result): documentHTML5 = f''' Read It Aloud

πŸ”Š Read It Aloud


''' components.html(documentHTML5, width=1280, height=300) def search_arxiv(query): start_time = time.strftime("%Y-%m-%d %H:%M:%S") client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") response1 = client.predict(message="Hello!!", llm_results_use=5, database_choice="Semantic Search", llm_model_picked="mistralai/Mistral-7B-Instruct-v0.2", api_name="/update_with_rag_md") Question = f'### πŸ”Ž {query}\r\n' References = response1[0] References2 = response1[1] filename = generate_filename(query, "md") with open(filename, "w", encoding="utf-8") as f: f.write(Question + References + References2) st.session_state.messages.append({"role": "assistant", "content": References + References2}) response2 = client.predict(query, "mistralai/Mixtral-8x7B-Instruct-v0.1", True, api_name="/ask_llm") if len(response2) > 10: Answer = response2 SpeechSynthesis(Answer) results = Question + '\r\n' + Answer + '\r\n' + References + '\r\n' + References2 return results return References + References2 roleplaying_glossary = { "πŸ€– AI Concepts": { "MoE (Mixture of Experts) 🧠": [ "As a leading AI health researcher, provide an overview of MoE, MAS, memory, and mirroring in healthcare applications.", "Explain how MoE and MAS can be leveraged to create AGI and AMI systems for healthcare, as an AI architect." ], "Multi Agent Systems (MAS) 🀝": [ "As a renowned MAS researcher, describe the key characteristics of distributed, autonomous, and cooperative MAS.", "Discuss how MAS is applied in robotics, simulations, and decentralized problem-solving, as an AI engineer." ] } } def display_glossary_grid(roleplaying_glossary): search_urls = { "πŸš€πŸŒŒArXiv": lambda k: f"/?q={quote(k)}", "πŸ“–": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}", "πŸ”": lambda k: f"https://www.google.com/search?q={quote(k)}" } for category, details in roleplaying_glossary.items(): st.write(f"### {category}") cols = st.columns(len(details)) for idx, (game, terms) in enumerate(details.items()): with cols[idx]: st.markdown(f"#### {game}") for term in terms: links_md = ' '.join([f"[{emoji}]({url(term)})" for emoji, url in search_urls.items()]) st.markdown(f"**{term}** {links_md}", unsafe_allow_html=True) def create_zip_of_files(files): zip_name = "assets.zip" with zipfile.ZipFile(zip_name, 'w') as zipf: for file in files: zipf.write(file) return zip_name def get_zip_download_link(zip_file): with open(zip_file, 'rb') as f: data = f.read() b64 = base64.b64encode(data).decode() return f'Download All' def FileSidebar(): all_files = glob.glob("*.md") all_files = [file for file in all_files if len(os.path.splitext(file)[0]) >= 10] all_files.sort(key=lambda x: (os.path.splitext(x)[1], x), reverse=True) Files1, Files2 = st.sidebar.columns(2) with Files1: if st.button("πŸ—‘ Delete All"): for file in all_files: os.remove(file) st.rerun() with Files2: if st.button("⬇️ Download"): zip_file = create_zip_of_files(all_files) st.sidebar.markdown(get_zip_download_link(zip_file), unsafe_allow_html=True) file_contents = '' file_name = '' next_action = '' for file in all_files: col1, col2, col3, col4, col5 = st.sidebar.columns([1, 6, 1, 1, 1]) with col1: if st.button("🌐", key=f"md_{file}"): with open(file, "r", encoding='utf-8') as f: file_contents = f.read() file_name = file next_action = 'md' st.session_state['next_action'] = next_action with col2: st.markdown(get_download_link(file, "text/markdown", file)) with col3: if st.button("πŸ“‚", key=f"open_{file}"): with open(file, "r", encoding='utf-8') as f: file_contents = f.read() file_name = file next_action = 'open' st.session_state['lastfilename'] = file st.session_state['filename'] = file st.session_state['filetext'] = file_contents st.session_state['next_action'] = next_action with col4: if st.button("▢️", key=f"read_{file}"): with open(file, "r", encoding='utf-8') as f: file_contents = f.read() file_name = file next_action = 'search' st.session_state['next_action'] = next_action with col5: if st.button("πŸ—‘", key=f"delete_{file}"): os.remove(file) file_name = file st.rerun() next_action = 'delete' st.session_state['next_action'] = next_action if len(file_contents) > 0: if next_action == 'open': if 'lastfilename' not in st.session_state: st.session_state['lastfilename'] = '' if 'filename' not in st.session_state: st.session_state['filename'] = '' if 'filetext' not in st.session_state: st.session_state['filetext'] = '' open1, open2 = st.columns([.8, .2]) with open1: file_name_input = st.text_input(key='file_name_input', label="File Name:", value=file_name) file_content_area = st.text_area(key='file_content_area', label="File Contents:", value=file_contents, height=300) if file_name_input != file_name: os.rename(file_name, file_name_input) st.markdown(f'Renamed file {file_name} to {file_name_input}.') if file_content_area != file_contents: with open(file_name_input, 'w', encoding='utf-8') as f: f.write(file_content_area) st.markdown(f'Saved {file_name_input}.') if next_action == 'search': st.text_area("File Contents:", file_contents, height=500) filesearch = "Create a streamlit python user app with full code listing: " + file_contents st.markdown(filesearch) if st.button(key='rerun', label='πŸ”Re-Code'): result = search_arxiv(filesearch) st.markdown(result) if next_action == 'md': st.markdown(file_contents) SpeechSynthesis(file_contents) FileSidebar() tabs = st.tabs(["Camera πŸ“·", "Download πŸ“₯", "OCR πŸ”", "Build 🌱", "Image Gen 🎨", "PDF πŸ“„", "Image πŸ–ΌοΈ", "Audio 🎡", "Video πŸŽ₯", "Code πŸ§‘β€πŸ’»", "Gallery πŸ“š", "Search πŸ”Ž"]) (tab_camera, tab_download, tab_ocr, tab_build, tab_imggen, tab_pdf, tab_image, tab_audio, tab_video, tab_code, tab_gallery, tab_search) = tabs with tab_camera: st.header("Camera Snap πŸ“·") cols = st.columns(2) for i, cam_key in enumerate(["cam0", "cam1"]): with cols[i]: cam_img = st.camera_input(f"Take a picture - Cam {i}", key=cam_key) if cam_img: filename = generate_filename(f"cam{i}", "png") if st.session_state[f'cam{i}_file'] and os.path.exists(st.session_state[f'cam{i}_file']): os.remove(st.session_state[f'cam{i}_file']) with open(filename, "wb") as f: f.write(cam_img.getvalue()) st.session_state[f'cam{i}_file'] = filename st.session_state['history'].append(f"Snapshot from Cam {i}: {filename}") st.image(Image.open(filename), caption=f"Camera {i}", use_container_width=True) with tab_download: st.header("Download PDFs πŸ“₯") if st.button("Examples πŸ“š"): example_urls = ["https://arxiv.org/pdf/2308.03892", "https://arxiv.org/pdf/1912.01703"] st.session_state['pdf_urls'] = "\n".join(example_urls) url_input = st.text_area("Enter PDF URLs (one per line)", value=st.session_state.get('pdf_urls', ""), height=200) if st.button("Robo-Download πŸ€–"): urls = url_input.strip().split("\n") progress_bar = st.progress(0) for idx, url in enumerate(urls): if url: output_path = pdf_url_to_filename(url) if download_pdf(url, output_path): st.session_state['downloaded_pdfs'][url] = output_path st.session_state['history'].append(f"Downloaded PDF: {output_path}") st.session_state['asset_checkboxes'][output_path] = True progress_bar.progress((idx + 1) / len(urls)) with tab_ocr: st.header("Test OCR πŸ”") all_files = get_gallery_files() if all_files: ocr_files = [f for f in all_files if f.endswith(('.png', '.pdf'))] if st.button("OCR All Assets πŸš€"): full_text = "# OCR Results\n\n" for file in ocr_files: if file.endswith('.png'): image = Image.open(file) else: try: doc = fitz.open(file) pix = doc[0].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) doc.close() except Exception as e: st.error(f"Failed to process {file}: {str(e)}") continue output_file = generate_filename(f"ocr_{os.path.basename(file)}", "txt") result = asyncio.run(process_ocr(image, output_file)) full_text += f"## {os.path.basename(file)}\n\n{result}\n\n" st.session_state['history'].append(f"OCR Test: {file} -> {output_file}") md_output_file = generate_filename("full_ocr", "md") with open(md_output_file, "w") as f: f.write(full_text) st.success(f"Full OCR saved to {md_output_file}") st.markdown(get_download_link(md_output_file, "text/markdown", "Download Full OCR Markdown"), unsafe_allow_html=True) selected_file = st.selectbox("Select Image or PDF", ocr_files, key="ocr_select") if selected_file: if selected_file.endswith('.png'): image = Image.open(selected_file) else: try: doc = fitz.open(selected_file) pix = doc[0].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) doc.close() except Exception as e: st.error(f"Cannot process {selected_file}: {str(e)}. Please select a PNG or PDF file.") image = None if image: st.image(image, caption="Input Image", use_container_width=True) if st.button("Run OCR πŸš€", key="ocr_run"): output_file = generate_filename("ocr_output", "txt") result = asyncio.run(process_ocr(image, output_file)) st.text_area("OCR Result", result, height=200) st.session_state['history'].append(f"OCR Test: {selected_file} -> {output_file}") else: st.warning("No assets in gallery yet. Use Camera Snap or Download PDFs!") with tab_build: st.header("Build Titan 🌱") model_type = st.selectbox("Model Type", ["Causal LM", "Diffusion"], key="build_type") base_model = st.selectbox("Select Model", ["HuggingFaceTB/SmolLM-135M", "Qwen/Qwen1.5-0.5B-Chat"] if model_type == "Causal LM" else ["OFA-Sys/small-stable-diffusion-v0", "stabilityai/stable-diffusion-2-base"]) model_name = st.text_input("Model Name", f"tiny-titan-{int(time.time())}") if st.button("Download Model ⬇️"): config = (ModelConfig if model_type == "Causal LM" else DiffusionConfig)(name=model_name, base_model=base_model, size="small") builder = ModelBuilder() if model_type == "Causal LM" else DiffusionBuilder() builder.load_model(base_model, config) builder.save_model(config.model_path) st.session_state['builder'] = builder st.session_state['model_loaded'] = True with tab_imggen: st.header("Test Image Gen 🎨") prompt = st.text_area("Prompt", "Generate a futuristic cityscape") if st.button("Run Image Gen πŸš€"): output_file = generate_filename("gen_output", "png", prompt=prompt) result = asyncio.run(process_image_gen(prompt, output_file)) st.image(result, caption="Generated Image", use_container_width=True) st.session_state['history'].append(f"Image Gen Test: {prompt} -> {output_file}") with tab_pdf: st.header("PDF Process πŸ“„") uploaded_pdfs = st.file_uploader("Upload PDFs", type=["pdf"], accept_multiple_files=True) view_mode = st.selectbox("View Mode", ["Single Page", "Two Pages"], key="pdf_view_mode") if st.button("Process PDFs"): for pdf_file in uploaded_pdfs: pdf_path = generate_filename(pdf_file.name, "pdf") with open(pdf_path, "wb") as f: f.write(pdf_file.read()) snapshots = asyncio.run(process_pdf_snapshot(pdf_path, "twopage" if view_mode == "Two Pages" else "single")) for snapshot in snapshots: st.image(Image.open(snapshot), caption=snapshot) text = process_image_with_prompt(Image.open(snapshot), "Extract the electronic text from image") st.text_area(f"Extracted Text from {snapshot}", text) code_prompt = f"Generate Python code based on this text:\n\n{text}" code = process_text_with_prompt(text, code_prompt) st.code(code, language="python") if st.button(f"Execute Code from {snapshot}"): output, error = execute_code(code) if error: st.error(f"Error: {error}") else: st.success(f"Output: {output or 'No output'}") with tab_image: st.header("Image Process πŸ–ΌοΈ") uploaded_images = st.file_uploader("Upload Images", type=["png", "jpg"], accept_multiple_files=True) prompt = st.text_input("Prompt", "Extract the electronic text from image") if st.button("Process Images"): for img_file in uploaded_images: img = Image.open(img_file) st.image(img, caption=img_file.name) result = process_image_with_prompt(img, prompt) st.text_area(f"Result for {img_file.name}", result) with tab_audio: st.header("Audio Process 🎡") audio_bytes = audio_recorder() if audio_bytes: filename = generate_filename("recording", "wav") with open(filename, "wb") as f: f.write(audio_bytes) st.audio(filename) process_audio(filename) with tab_video: st.header("Video Process πŸŽ₯") video_input = st.file_uploader("Upload Video", type=["mp4"]) if video_input: video_path = generate_filename(video_input.name, "mp4") with open(video_path, "wb") as f: f.write(video_input.read()) st.video(video_path) result = process_video(video_path, "Summarize this video in markdown") st.markdown(result) with tab_code: st.header("Code Executor πŸ§‘β€πŸ’»") uploaded_file = st.file_uploader("πŸ“€ Upload a Python (.py) or Markdown (.md) file", type=['py', 'md']) if 'code' not in st.session_state: st.session_state.code = '''import streamlit as st\nst.write("Hello, World!")''' if uploaded_file: content = uploaded_file.getvalue().decode() if uploaded_file.type == "text/markdown": code_blocks = extract_python_code(content) code_input = code_blocks[0] if code_blocks else "" else: code_input = content else: code_input = st.text_area("Python Code", value=st.session_state.code, height=400) col1, col2 = st.columns([1, 1]) with col1: if st.button("▢️ Run Code"): output, error = execute_code(code_input) if error: st.error(f"Error: {error}") else: st.success(f"Output: {output or 'No output'}") with col2: if st.button("πŸ—‘οΈ Clear Code"): st.session_state.code = "" st.rerun() with tab_gallery: st.header("Gallery πŸ“š") all_files = get_gallery_files() for file in all_files: if file.endswith('.png'): st.image(Image.open(file), caption=file) elif file.endswith('.pdf'): doc = fitz.open(file) pix = doc[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5)) st.image(Image.frombytes("RGB", [pix.width, pix.height], pix.samples), caption=file) doc.close() elif file.endswith('.md'): with open(file, "r") as f: st.markdown(f.read()) elif file.endswith('.wav'): st.audio(file) elif file.endswith('.mp4'): st.video(file) with tab_search: st.header("ArXiv Search πŸ”Ž") query = st.text_input("Search ArXiv", "") if query: result = search_arxiv(query) st.markdown(result) st.sidebar.subheader("Gallery Settings") st.session_state['gallery_size'] = st.sidebar.slider("Gallery Size", 1, 10, st.session_state['gallery_size'], key="gallery_size_slider") display_gpu_info() st.sidebar.subheader("Action Logs πŸ“œ") for record in log_records: st.sidebar.write(f"{record.asctime} - {record.levelname} - {record.message}") st.sidebar.subheader("History πŸ“œ") for entry in st.session_state.get("history", []): if entry: st.sidebar.write(entry) def update_gallery(): container = st.session_state['asset_gallery_container'] container.empty() all_files = get_gallery_files() if all_files: container.markdown("### Asset Gallery πŸ“ΈπŸ“–") cols = container.columns(2) for idx, file in enumerate(all_files[:st.session_state['gallery_size']]): with cols[idx % 2]: if file.endswith('.png'): st.image(Image.open(file), caption=os.path.basename(file)) elif file.endswith('.pdf'): doc = fitz.open(file) pix = doc[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5)) st.image(Image.frombytes("RGB", [pix.width, pix.height], pix.samples), caption=os.path.basename(file)) doc.close() st.checkbox("Select", key=f"asset_{file}", value=st.session_state['asset_checkboxes'].get(file, False)) st.markdown(get_download_link(file, "application/octet-stream", "Download"), unsafe_allow_html=True) if st.button("Delete", key=f"delete_{file}"): os.remove(file) st.session_state['asset_checkboxes'].pop(file, None) st.experimental_rerun() update_gallery() if prompt := st.chat_input("GPT-4o Multimodal ChatBot - What can I help you with?"): st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) with st.chat_message("assistant"): completion = client.chat.completions.create(model=st.session_state["openai_model"], messages=st.session_state.messages, stream=True) response = "" for chunk in completion: if chunk.choices[0].delta.content: response += chunk.choices[0].delta.content st.write(response) st.session_state.messages.append({"role": "assistant", "content": response}) def create_audio_file(filename, audio_input, flag): with open(filename, "wb") as f: f.write(audio_input)