|
|
|
import os |
|
import glob |
|
import base64 |
|
import time |
|
import streamlit as st |
|
import fitz |
|
import requests |
|
from PIL import Image |
|
import asyncio |
|
import aiofiles |
|
from io import BytesIO |
|
import zipfile |
|
import random |
|
import re |
|
from openai import OpenAI |
|
import logging |
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
|
|
st.set_page_config( |
|
page_title="AI Document Processor 🚀", |
|
page_icon="🤖", |
|
layout="wide", |
|
initial_sidebar_state="expanded", |
|
) |
|
|
|
|
|
if 'history' not in st.session_state: |
|
st.session_state['history'] = [] |
|
if 'processing' not in st.session_state: |
|
st.session_state['processing'] = {} |
|
if 'asset_checkboxes' not in st.session_state: |
|
st.session_state['asset_checkboxes'] = {} |
|
if 'unique_counter' not in st.session_state: |
|
st.session_state['unique_counter'] = 0 |
|
if 'messages' not in st.session_state: |
|
st.session_state['messages'] = [] |
|
|
|
|
|
openai_api_key = os.getenv('OPENAI_API_KEY') |
|
openai_org_id = os.getenv('OPENAI_ORG_ID') |
|
client = OpenAI(api_key=openai_api_key, organization=openai_org_id) |
|
GPT_MODEL = "gpt-4o-2024-05-13" |
|
GPT_MINI_MODEL = "o3-mini-high" |
|
|
|
def generate_filename(sequence, ext="png"): |
|
timestamp = time.strftime("%d%m%Y%H%M%S") |
|
return f"{sequence}_{timestamp}.{ext}" |
|
|
|
def pdf_url_to_filename(url): |
|
safe_name = re.sub(r'[<>:"/\\|?*]', '_', url) |
|
return f"{safe_name}.pdf" |
|
|
|
def get_download_link(file_path, mime_type="application/pdf", label="Download"): |
|
with open(file_path, 'rb') as f: |
|
data = f.read() |
|
b64 = base64.b64encode(data).decode() |
|
return f'<a href="data:{mime_type};base64,{b64}" download="{os.path.basename(file_path)}">{label}</a>' |
|
|
|
def get_gallery_files(file_types=["png", "pdf", "md"]): |
|
return sorted(list(set([f for ext in file_types for f in glob.glob(f"*.{ext}")]))) |
|
|
|
def get_pdf_files(): |
|
return sorted(glob.glob("*.pdf")) |
|
|
|
def get_md_files(): |
|
return sorted(glob.glob("*.md")) |
|
|
|
def download_pdf(url, output_path): |
|
try: |
|
response = requests.get(url, stream=True, timeout=10) |
|
if response.status_code == 200: |
|
with open(output_path, "wb") as f: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
return True |
|
except requests.RequestException as e: |
|
logger.error(f"Failed to download {url}: {e}") |
|
return False |
|
|
|
async def process_pdf_to_images(pdf_path, mode="double"): |
|
doc = fitz.open(pdf_path) |
|
output_files = [] |
|
step = 2 if mode == "double" else 1 |
|
for i in range(0, len(doc), step): |
|
if mode == "double" and i + 1 < len(doc): |
|
|
|
page1 = doc[i] |
|
page2 = doc[i + 1] |
|
pix1 = page1.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) |
|
pix2 = page2.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) |
|
combined_width = pix1.width + pix2.width |
|
combined_height = max(pix1.height, pix2.height) |
|
combined_pix = fitz.Pixmap(fitz.csRGB, combined_width, combined_height) |
|
combined_pix.set_rect(fitz.IRect(0, 0, pix1.width, pix1.height), pix1) |
|
combined_pix.set_rect(fitz.IRect(pix1.width, 0, combined_width, pix2.height), pix2) |
|
output_file = generate_filename(f"double_page_{i}", "png") |
|
combined_pix.save(output_file) |
|
output_files.append(output_file) |
|
else: |
|
page = doc[i] |
|
pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) |
|
output_file = generate_filename(f"page_{i}", "png") |
|
pix.save(output_file) |
|
output_files.append(output_file) |
|
doc.close() |
|
return output_files |
|
|
|
async def extract_text_from_image(image_path): |
|
with open(image_path, "rb") as image_file: |
|
base64_image = base64.b64encode(image_file.read()).decode("utf-8") |
|
response = client.chat.completions.create( |
|
model=GPT_MODEL, |
|
messages=[{"role": "user", "content": [ |
|
{"type": "text", "text": "Extract the electronic text from this image"}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}]}], |
|
temperature=0.0 |
|
) |
|
return response.choices[0].message.content |
|
|
|
def update_gallery(): |
|
all_files = get_gallery_files() |
|
if all_files: |
|
st.sidebar.subheader("Asset Gallery 📸📖") |
|
cols = st.sidebar.columns(2) |
|
for idx, file in enumerate(all_files[:4]): |
|
with cols[idx % 2]: |
|
st.session_state['unique_counter'] += 1 |
|
unique_id = st.session_state['unique_counter'] |
|
if file.endswith('.png'): |
|
st.image(Image.open(file), caption=os.path.basename(file), use_container_width=True) |
|
elif file.endswith('.pdf'): |
|
doc = fitz.open(file) |
|
pix = doc[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5)) |
|
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
|
st.image(img, caption=os.path.basename(file), use_container_width=True) |
|
doc.close() |
|
else: |
|
st.write(f"📜 {os.path.basename(file)}") |
|
st.markdown(get_download_link(file, "application/octet-stream", "Download"), unsafe_allow_html=True) |
|
|
|
st.title("AI Document Processor 🚀") |
|
|
|
|
|
st.sidebar.header("Captured Files 📜") |
|
if st.sidebar.button("Zap All! 🗑️"): |
|
for file in get_gallery_files(): |
|
os.remove(file) |
|
st.session_state['asset_checkboxes'].clear() |
|
st.sidebar.success("All assets vaporized! 💨") |
|
st.rerun() |
|
update_gallery() |
|
|
|
tab1, tab2, tab3 = st.tabs(["PDF Processing 📖", "Image Processing 🖼️", "Markdown Management 📝"]) |
|
|
|
with tab1: |
|
st.header("PDF Processing 📖") |
|
pdf_files = st.file_uploader("Upload PDFs", type=["pdf"], accept_multiple_files=True) |
|
if pdf_files and st.button("Process PDFs"): |
|
for pdf_file in pdf_files: |
|
pdf_path = f"uploaded_{pdf_file.name}" |
|
with open(pdf_path, "wb") as f: |
|
f.write(pdf_file.getvalue()) |
|
images = asyncio.run(process_pdf_to_images(pdf_path, mode="double")) |
|
full_text = "" |
|
for img in images: |
|
text = asyncio.run(extract_text_from_image(img)) |
|
full_text += f"# Page {images.index(img) + 1}\n\n{text}\n\n" |
|
md_file = f"{os.path.splitext(pdf_path)[0]}.md" |
|
with open(md_file, "w") as f: |
|
f.write(full_text) |
|
st.image([Image.open(img) for img in images], caption=images, width=300) |
|
st.markdown(get_download_link(md_file, "text/markdown", "Download Markdown"), unsafe_allow_html=True) |
|
update_gallery() |
|
|
|
with tab2: |
|
st.header("Image Processing 🖼️") |
|
prompt = st.text_area("Enter Prompt for Images", "Extract the electronic text from this image") |
|
image_files = st.file_uploader("Upload Images", type=["png", "jpg", "jpeg"], accept_multiple_files=True) |
|
if image_files and st.button("Process Images"): |
|
full_text = "" |
|
for img_file in image_files: |
|
img_path = f"uploaded_{img_file.name}" |
|
with open(img_path, "wb") as f: |
|
f.write(img_file.getvalue()) |
|
text = asyncio.run(extract_text_from_image(img_path)) |
|
full_text += f"# {img_file.name}\n\n{text}\n\n" |
|
st.image(Image.open(img_path), caption=img_file.name, width=300) |
|
md_file = generate_filename("image_ocr", "md") |
|
with open(md_file, "w") as f: |
|
f.write(full_text) |
|
st.markdown(get_download_link(md_file, "text/markdown", "Download Markdown"), unsafe_allow_html=True) |
|
update_gallery() |
|
|
|
with tab3: |
|
st.header("Markdown Management 📝") |
|
md_files = get_md_files() |
|
col1, col2 = st.columns(2) |
|
with col1: |
|
st.subheader("File Listing") |
|
selected_files = [] |
|
for md_file in md_files: |
|
if st.checkbox(md_file, key=f"md_{md_file}"): |
|
selected_files.append(md_file) |
|
with col2: |
|
st.subheader("Process Selected Files") |
|
default_prompt = "Summarize this into markdown outline with emojis and number the topics 1..12" |
|
prompt = st.text_area("Enter Prompt", default_prompt) |
|
if st.button("Process with GPT") and selected_files: |
|
combined_text = "" |
|
for md_file in selected_files: |
|
with open(md_file, "r") as f: |
|
combined_text += f.read() + "\n\n" |
|
response = client.chat.completions.create( |
|
model=GPT_MINI_MODEL, |
|
messages=[{"role": "user", "content": f"{prompt}\n\n{combined_text}"}], |
|
temperature=0.0 |
|
) |
|
output_md = generate_filename("gpt_output", "md") |
|
with open(output_md, "w") as f: |
|
f.write(response.choices[0].message.content) |
|
st.markdown(response.choices[0].message.content) |
|
st.markdown(get_download_link(output_md, "text/markdown", "Download Output"), unsafe_allow_html=True) |
|
update_gallery() |
|
|
|
update_gallery() |