awacke1's picture
Update app.py
c338af7 verified
raw
history blame
9.38 kB
#!/usr/bin/env python3
import os
import glob
import base64
import time
import streamlit as st
import fitz
import requests
from PIL import Image
import asyncio
import aiofiles
from io import BytesIO
import zipfile
import random
import re
from openai import OpenAI
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
st.set_page_config(
page_title="AI Document Processor 🚀",
page_icon="🤖",
layout="wide",
initial_sidebar_state="expanded",
)
# Session state initialization
if 'history' not in st.session_state:
st.session_state['history'] = []
if 'processing' not in st.session_state:
st.session_state['processing'] = {}
if 'asset_checkboxes' not in st.session_state:
st.session_state['asset_checkboxes'] = {}
if 'unique_counter' not in st.session_state:
st.session_state['unique_counter'] = 0
if 'messages' not in st.session_state:
st.session_state['messages'] = []
# OpenAI setup
openai_api_key = os.getenv('OPENAI_API_KEY')
openai_org_id = os.getenv('OPENAI_ORG_ID')
client = OpenAI(api_key=openai_api_key, organization=openai_org_id)
GPT_MODEL = "gpt-4o-2024-05-13"
GPT_MINI_MODEL = "o3-mini-high" # Placeholder, adjust as per actual model name
def generate_filename(sequence, ext="png"):
timestamp = time.strftime("%d%m%Y%H%M%S")
return f"{sequence}_{timestamp}.{ext}"
def pdf_url_to_filename(url):
safe_name = re.sub(r'[<>:"/\\|?*]', '_', url)
return f"{safe_name}.pdf"
def get_download_link(file_path, mime_type="application/pdf", label="Download"):
with open(file_path, 'rb') as f:
data = f.read()
b64 = base64.b64encode(data).decode()
return f'<a href="data:{mime_type};base64,{b64}" download="{os.path.basename(file_path)}">{label}</a>'
def get_gallery_files(file_types=["png", "pdf", "md"]):
return sorted(list(set([f for ext in file_types for f in glob.glob(f"*.{ext}")])))
def get_pdf_files():
return sorted(glob.glob("*.pdf"))
def get_md_files():
return sorted(glob.glob("*.md"))
def download_pdf(url, output_path):
try:
response = requests.get(url, stream=True, timeout=10)
if response.status_code == 200:
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return True
except requests.RequestException as e:
logger.error(f"Failed to download {url}: {e}")
return False
async def process_pdf_to_images(pdf_path, mode="double"):
doc = fitz.open(pdf_path)
output_files = []
step = 2 if mode == "double" else 1
for i in range(0, len(doc), step):
if mode == "double" and i + 1 < len(doc):
# Combine two pages into one image
page1 = doc[i]
page2 = doc[i + 1]
pix1 = page1.get_pixmap(matrix=fitz.Matrix(2.0, 2.0))
pix2 = page2.get_pixmap(matrix=fitz.Matrix(2.0, 2.0))
combined_width = pix1.width + pix2.width
combined_height = max(pix1.height, pix2.height)
combined_pix = fitz.Pixmap(fitz.csRGB, combined_width, combined_height)
combined_pix.set_rect(fitz.IRect(0, 0, pix1.width, pix1.height), pix1)
combined_pix.set_rect(fitz.IRect(pix1.width, 0, combined_width, pix2.height), pix2)
output_file = generate_filename(f"double_page_{i}", "png")
combined_pix.save(output_file)
output_files.append(output_file)
else:
page = doc[i]
pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0))
output_file = generate_filename(f"page_{i}", "png")
pix.save(output_file)
output_files.append(output_file)
doc.close()
return output_files
async def extract_text_from_image(image_path):
with open(image_path, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode("utf-8")
response = client.chat.completions.create(
model=GPT_MODEL,
messages=[{"role": "user", "content": [
{"type": "text", "text": "Extract the electronic text from this image"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}]}],
temperature=0.0
)
return response.choices[0].message.content
def update_gallery():
all_files = get_gallery_files()
if all_files:
st.sidebar.subheader("Asset Gallery 📸📖")
cols = st.sidebar.columns(2)
for idx, file in enumerate(all_files[:4]): # Limit to 4 for brevity
with cols[idx % 2]:
st.session_state['unique_counter'] += 1
unique_id = st.session_state['unique_counter']
if file.endswith('.png'):
st.image(Image.open(file), caption=os.path.basename(file), use_container_width=True)
elif file.endswith('.pdf'):
doc = fitz.open(file)
pix = doc[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5))
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
st.image(img, caption=os.path.basename(file), use_container_width=True)
doc.close()
else: # .md files
st.write(f"📜 {os.path.basename(file)}")
st.markdown(get_download_link(file, "application/octet-stream", "Download"), unsafe_allow_html=True)
st.title("AI Document Processor 🚀")
# Sidebar
st.sidebar.header("Captured Files 📜")
if st.sidebar.button("Zap All! 🗑️"):
for file in get_gallery_files():
os.remove(file)
st.session_state['asset_checkboxes'].clear()
st.sidebar.success("All assets vaporized! 💨")
st.rerun()
update_gallery()
tab1, tab2, tab3 = st.tabs(["PDF Processing 📖", "Image Processing 🖼️", "Markdown Management 📝"])
with tab1:
st.header("PDF Processing 📖")
pdf_files = st.file_uploader("Upload PDFs", type=["pdf"], accept_multiple_files=True)
if pdf_files and st.button("Process PDFs"):
for pdf_file in pdf_files:
pdf_path = f"uploaded_{pdf_file.name}"
with open(pdf_path, "wb") as f:
f.write(pdf_file.getvalue())
images = asyncio.run(process_pdf_to_images(pdf_path, mode="double"))
full_text = ""
for img in images:
text = asyncio.run(extract_text_from_image(img))
full_text += f"# Page {images.index(img) + 1}\n\n{text}\n\n"
md_file = f"{os.path.splitext(pdf_path)[0]}.md"
with open(md_file, "w") as f:
f.write(full_text)
st.image([Image.open(img) for img in images], caption=images, width=300)
st.markdown(get_download_link(md_file, "text/markdown", "Download Markdown"), unsafe_allow_html=True)
update_gallery()
with tab2:
st.header("Image Processing 🖼️")
prompt = st.text_area("Enter Prompt for Images", "Extract the electronic text from this image")
image_files = st.file_uploader("Upload Images", type=["png", "jpg", "jpeg"], accept_multiple_files=True)
if image_files and st.button("Process Images"):
full_text = ""
for img_file in image_files:
img_path = f"uploaded_{img_file.name}"
with open(img_path, "wb") as f:
f.write(img_file.getvalue())
text = asyncio.run(extract_text_from_image(img_path))
full_text += f"# {img_file.name}\n\n{text}\n\n"
st.image(Image.open(img_path), caption=img_file.name, width=300)
md_file = generate_filename("image_ocr", "md")
with open(md_file, "w") as f:
f.write(full_text)
st.markdown(get_download_link(md_file, "text/markdown", "Download Markdown"), unsafe_allow_html=True)
update_gallery()
with tab3:
st.header("Markdown Management 📝")
md_files = get_md_files()
col1, col2 = st.columns(2)
with col1:
st.subheader("File Listing")
selected_files = []
for md_file in md_files:
if st.checkbox(md_file, key=f"md_{md_file}"):
selected_files.append(md_file)
with col2:
st.subheader("Process Selected Files")
default_prompt = "Summarize this into markdown outline with emojis and number the topics 1..12"
prompt = st.text_area("Enter Prompt", default_prompt)
if st.button("Process with GPT") and selected_files:
combined_text = ""
for md_file in selected_files:
with open(md_file, "r") as f:
combined_text += f.read() + "\n\n"
response = client.chat.completions.create(
model=GPT_MINI_MODEL, # Replace with actual model if different
messages=[{"role": "user", "content": f"{prompt}\n\n{combined_text}"}],
temperature=0.0
)
output_md = generate_filename("gpt_output", "md")
with open(output_md, "w") as f:
f.write(response.choices[0].message.content)
st.markdown(response.choices[0].message.content)
st.markdown(get_download_link(output_md, "text/markdown", "Download Output"), unsafe_allow_html=True)
update_gallery()
update_gallery()