#!/usr/bin/env python3 import os import glob import base64 import time import streamlit as st import fitz import requests from PIL import Image import asyncio import aiofiles from io import BytesIO import zipfile import random import re from openai import OpenAI import logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) st.set_page_config( page_title="AI Document Processor πŸš€", page_icon="πŸ€–", layout="wide", initial_sidebar_state="expanded", ) # Session state initialization if 'history' not in st.session_state: st.session_state['history'] = [] if 'processing' not in st.session_state: st.session_state['processing'] = {} if 'asset_checkboxes' not in st.session_state: st.session_state['asset_checkboxes'] = {} if 'unique_counter' not in st.session_state: st.session_state['unique_counter'] = 0 if 'messages' not in st.session_state: st.session_state['messages'] = [] # OpenAI setup openai_api_key = os.getenv('OPENAI_API_KEY') openai_org_id = os.getenv('OPENAI_ORG_ID') client = OpenAI(api_key=openai_api_key, organization=openai_org_id) GPT_MODEL = "gpt-4o-2024-05-13" GPT_MINI_MODEL = "o3-mini-high" # Placeholder, adjust as per actual model name def generate_filename(sequence, ext="png"): timestamp = time.strftime("%d%m%Y%H%M%S") return f"{sequence}_{timestamp}.{ext}" def pdf_url_to_filename(url): safe_name = re.sub(r'[<>:"/\\|?*]', '_', url) return f"{safe_name}.pdf" def get_download_link(file_path, mime_type="application/pdf", label="Download"): with open(file_path, 'rb') as f: data = f.read() b64 = base64.b64encode(data).decode() return f'{label}' def get_gallery_files(file_types=["png", "pdf", "md"]): return sorted(list(set([f for ext in file_types for f in glob.glob(f"*.{ext}")]))) def get_pdf_files(): return sorted(glob.glob("*.pdf")) def get_md_files(): return sorted(glob.glob("*.md")) def download_pdf(url, output_path): try: response = requests.get(url, stream=True, timeout=10) if response.status_code == 200: with open(output_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return True except requests.RequestException as e: logger.error(f"Failed to download {url}: {e}") return False async def process_pdf_to_images(pdf_path, mode="double"): doc = fitz.open(pdf_path) output_files = [] step = 2 if mode == "double" else 1 for i in range(0, len(doc), step): if mode == "double" and i + 1 < len(doc): # Combine two pages into one image page1 = doc[i] page2 = doc[i + 1] pix1 = page1.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) pix2 = page2.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) combined_width = pix1.width + pix2.width combined_height = max(pix1.height, pix2.height) combined_pix = fitz.Pixmap(fitz.csRGB, combined_width, combined_height) combined_pix.set_rect(fitz.IRect(0, 0, pix1.width, pix1.height), pix1) combined_pix.set_rect(fitz.IRect(pix1.width, 0, combined_width, pix2.height), pix2) output_file = generate_filename(f"double_page_{i}", "png") combined_pix.save(output_file) output_files.append(output_file) else: page = doc[i] pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) output_file = generate_filename(f"page_{i}", "png") pix.save(output_file) output_files.append(output_file) doc.close() return output_files async def extract_text_from_image(image_path): with open(image_path, "rb") as image_file: base64_image = base64.b64encode(image_file.read()).decode("utf-8") response = client.chat.completions.create( model=GPT_MODEL, messages=[{"role": "user", "content": [ {"type": "text", "text": "Extract the electronic text from this image"}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}]}], temperature=0.0 ) return response.choices[0].message.content def update_gallery(): all_files = get_gallery_files() if all_files: st.sidebar.subheader("Asset Gallery πŸ“ΈπŸ“–") cols = st.sidebar.columns(2) for idx, file in enumerate(all_files[:4]): # Limit to 4 for brevity with cols[idx % 2]: st.session_state['unique_counter'] += 1 unique_id = st.session_state['unique_counter'] if file.endswith('.png'): st.image(Image.open(file), caption=os.path.basename(file), use_container_width=True) elif file.endswith('.pdf'): doc = fitz.open(file) pix = doc[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5)) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) st.image(img, caption=os.path.basename(file), use_container_width=True) doc.close() else: # .md files st.write(f"πŸ“œ {os.path.basename(file)}") st.markdown(get_download_link(file, "application/octet-stream", "Download"), unsafe_allow_html=True) st.title("AI Document Processor πŸš€") # Sidebar st.sidebar.header("Captured Files πŸ“œ") if st.sidebar.button("Zap All! πŸ—‘οΈ"): for file in get_gallery_files(): os.remove(file) st.session_state['asset_checkboxes'].clear() st.sidebar.success("All assets vaporized! πŸ’¨") st.rerun() update_gallery() tab1, tab2, tab3 = st.tabs(["PDF Processing πŸ“–", "Image Processing πŸ–ΌοΈ", "Markdown Management πŸ“"]) with tab1: st.header("PDF Processing πŸ“–") pdf_files = st.file_uploader("Upload PDFs", type=["pdf"], accept_multiple_files=True) if pdf_files and st.button("Process PDFs"): for pdf_file in pdf_files: pdf_path = f"uploaded_{pdf_file.name}" with open(pdf_path, "wb") as f: f.write(pdf_file.getvalue()) images = asyncio.run(process_pdf_to_images(pdf_path, mode="double")) full_text = "" for img in images: text = asyncio.run(extract_text_from_image(img)) full_text += f"# Page {images.index(img) + 1}\n\n{text}\n\n" md_file = f"{os.path.splitext(pdf_path)[0]}.md" with open(md_file, "w") as f: f.write(full_text) st.image([Image.open(img) for img in images], caption=images, width=300) st.markdown(get_download_link(md_file, "text/markdown", "Download Markdown"), unsafe_allow_html=True) update_gallery() with tab2: st.header("Image Processing πŸ–ΌοΈ") prompt = st.text_area("Enter Prompt for Images", "Extract the electronic text from this image") image_files = st.file_uploader("Upload Images", type=["png", "jpg", "jpeg"], accept_multiple_files=True) if image_files and st.button("Process Images"): full_text = "" for img_file in image_files: img_path = f"uploaded_{img_file.name}" with open(img_path, "wb") as f: f.write(img_file.getvalue()) text = asyncio.run(extract_text_from_image(img_path)) full_text += f"# {img_file.name}\n\n{text}\n\n" st.image(Image.open(img_path), caption=img_file.name, width=300) md_file = generate_filename("image_ocr", "md") with open(md_file, "w") as f: f.write(full_text) st.markdown(get_download_link(md_file, "text/markdown", "Download Markdown"), unsafe_allow_html=True) update_gallery() with tab3: st.header("Markdown Management πŸ“") md_files = get_md_files() col1, col2 = st.columns(2) with col1: st.subheader("File Listing") selected_files = [] for md_file in md_files: if st.checkbox(md_file, key=f"md_{md_file}"): selected_files.append(md_file) with col2: st.subheader("Process Selected Files") default_prompt = "Summarize this into markdown outline with emojis and number the topics 1..12" prompt = st.text_area("Enter Prompt", default_prompt) if st.button("Process with GPT") and selected_files: combined_text = "" for md_file in selected_files: with open(md_file, "r") as f: combined_text += f.read() + "\n\n" response = client.chat.completions.create( model=GPT_MINI_MODEL, # Replace with actual model if different messages=[{"role": "user", "content": f"{prompt}\n\n{combined_text}"}], temperature=0.0 ) output_md = generate_filename("gpt_output", "md") with open(output_md, "w") as f: f.write(response.choices[0].message.content) st.markdown(response.choices[0].message.content) st.markdown(get_download_link(output_md, "text/markdown", "Download Output"), unsafe_allow_html=True) update_gallery() update_gallery()