pdf2markdown / app.py
broadfield-dev's picture
Update app.py
a87a8f6 verified
raw
history blame
22.6 kB
import os
import io
import re
import logging
import subprocess
from datetime import datetime
import urllib.parse
import tempfile
import json # For streaming JSON messages
import time # For gevent.sleep
from flask import Flask, request, render_template, Response, stream_with_context
from werkzeug.utils import secure_filename
# Ensure gevent is imported and monkey patched if needed for other libraries
# from gevent import monkey
# monkey.patch_all() # Apply this early if you suspect issues with other libs
import requests # For requests.exceptions.HTTPError
from requests.exceptions import HTTPError as RequestsHTTPError # Specific import for clarity
import pdfplumber
import pdf2image # <<<<<<<<<<<<<<<< CORRECTED: Added this import
from pdf2image import convert_from_path, convert_from_bytes # Keep these for direct use too
# from pdf2image.exceptions import ... # If you need to catch specific pdf2image errors
import pytesseract
from PIL import Image
from huggingface_hub import HfApi, create_repo
# --- Flask App Initialization ---
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = tempfile.gettempdir()
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50 MB limit for uploads, adjust as needed
# --- Logging Configuration ---
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# --- Hugging Face Configuration ---
HF_TOKEN = os.getenv("HF_TOKEN")
HF_DATASET_REPO_NAME = os.getenv("HF_DATASET_REPO_NAME", "pdf-images-extracted")
hf_api = HfApi()
# --- Helper to yield messages for streaming ---
def yield_message(type, data):
"""Helper to format messages as JSON strings for streaming."""
return json.dumps({"type": type, **data}) + "\n"
# --- PDF Processing Helper Functions (Adapted for Streaming) ---
def check_poppler():
try:
result = subprocess.run(["pdftoppm", "-v"], capture_output=True, text=True, check=False)
version_info_log = result.stderr.strip() if result.stderr else result.stdout.strip()
if version_info_log:
logger.info(f"Poppler version check: {version_info_log.splitlines()[0] if version_info_log else 'No version output'}")
else:
logger.info("Poppler 'pdftoppm -v' ran. Assuming Poppler is present.")
return True
except FileNotFoundError:
logger.error("Poppler (pdftoppm command) not found. Ensure poppler-utils is installed and in PATH.")
return False
except Exception as e:
logger.error(f"An unexpected error occurred during Poppler check: {str(e)}")
return False
def ensure_hf_dataset():
if not HF_TOKEN:
msg = "HF_TOKEN is not set. Cannot ensure Hugging Face dataset. Image uploads will fail."
logger.warning(msg)
return "Error: " + msg
try:
repo_id_obj = create_repo(repo_id=HF_DATASET_REPO_NAME, token=HF_TOKEN, repo_type="dataset", exist_ok=True)
logger.info(f"Dataset repo ensured: {repo_id_obj.repo_id}")
return repo_id_obj.repo_id
except RequestsHTTPError as e:
if e.response is not None and e.response.status_code == 409:
logger.info(f"Dataset repo '{HF_DATASET_REPO_NAME}' already exists (HTTP 409).")
try:
user_info = hf_api.whoami(token=HF_TOKEN)
namespace = user_info.get('name') if user_info else None
if namespace:
return f"{namespace}/{HF_DATASET_REPO_NAME}"
else:
logger.warning(f"Could not determine namespace for existing repo '{HF_DATASET_REPO_NAME}'. Using generic ID.")
return HF_DATASET_REPO_NAME
except Exception as whoami_e:
logger.error(f"Could not determine namespace for existing repo via whoami due to: {whoami_e}. Using generic ID.")
return HF_DATASET_REPO_NAME
else:
status_code = e.response.status_code if e.response is not None else "Unknown"
logger.error(f"Hugging Face dataset HTTP error (Status: {status_code}): {str(e)}")
return f"Error: Failed to access or create dataset '{HF_DATASET_REPO_NAME}' due to HTTP error: {str(e)}"
except Exception as e:
logger.error(f"Hugging Face dataset general error: {str(e)}", exc_info=True)
return f"Error: Failed to access or create dataset '{HF_DATASET_REPO_NAME}': {str(e)}"
def upload_image_to_hf_stream(image_pil, filename_base, page_num_for_log=""):
repo_id_or_error = ensure_hf_dataset()
if isinstance(repo_id_or_error, str) and repo_id_or_error.startswith("Error"):
return repo_id_or_error
repo_id = repo_id_or_error
temp_image_path = None
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
repo_filename = f"images/{filename_base}_{page_num_for_log}_{timestamp}.png"
# Ensure UPLOAD_FOLDER exists before writing temp file
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
with tempfile.NamedTemporaryFile(delete=False, suffix=".png", dir=app.config['UPLOAD_FOLDER']) as tmp_file:
temp_image_path = tmp_file.name
image_pil.save(temp_image_path, format="PNG")
logger.info(f"Attempting to upload {temp_image_path} to {repo_id}/{repo_filename}")
file_url = hf_api.upload_file(
path_or_fileobj=temp_image_path, path_in_repo=repo_filename,
repo_id=repo_id, repo_type="dataset", token=HF_TOKEN
)
logger.info(f"Successfully uploaded image: {file_url}")
return file_url
except Exception as e:
logger.error(f"Image upload error for {filename_base}{page_num_for_log}: {str(e)}", exc_info=True)
return f"Error uploading image {filename_base}{page_num_for_log}: {str(e)}"
finally:
if temp_image_path and os.path.exists(temp_image_path):
try: os.remove(temp_image_path)
except OSError as ose: logger.error(f"Error removing temp image file {temp_image_path}: {ose}")
def format_page_text_to_markdown_chunk(page_text_content):
chunk_md = ""
page_text_content = re.sub(r'\n\s*\n+', '\n\n', page_text_content.strip())
lines = page_text_content.split('\n')
is_in_list = False
for line_text in lines:
line_stripped = line_text.strip()
if not line_stripped:
chunk_md += "\n"
is_in_list = False
continue
list_match = re.match(r'^\s*(?:(?:\d+\.)|[*+-])\s+(.*)', line_stripped)
is_heading_candidate = line_stripped.isupper() and 5 < len(line_stripped) < 100
if is_heading_candidate and not list_match:
chunk_md += f"## {line_stripped}\n\n"
is_in_list = False
elif list_match:
list_item_text = list_match.group(1)
chunk_md += f"- {list_item_text}\n"
is_in_list = True
else:
if is_in_list: chunk_md += "\n"
chunk_md += f"{line_text}\n\n"
is_in_list = False
return re.sub(r'\n\s*\n+', '\n\n', chunk_md.strip()) + "\n\n"
# --- Main PDF Processing Logic (Generator Function for Streaming) ---
def generate_pdf_conversion_stream(pdf_input_source_path_or_url):
try:
yield yield_message("markdown_replace", {"content": "# Extracted PDF Content\n\n"})
time.sleep(0.01)
yield yield_message("status", {"message": "Opening PDF for text extraction..."})
time.sleep(0.01)
source_is_url = isinstance(pdf_input_source_path_or_url, str) and \
pdf_input_source_path_or_url.startswith(('http://', 'https://'))
pdf_handle_for_text = None
pdf_bytes_for_images = None
if source_is_url:
try:
response = requests.get(pdf_input_source_path_or_url, stream=False, timeout=60)
response.raise_for_status()
pdf_bytes_for_images = response.content
pdf_handle_for_text = io.BytesIO(pdf_bytes_for_images)
yield yield_message("status", {"message": f"PDF downloaded from URL ({len(pdf_bytes_for_images)/1024:.2f} KB)."})
time.sleep(0.01)
except RequestsHTTPError as e:
logger.error(f"URL fetch HTTP error for PDF processing: {str(e)} (Status: {e.response.status_code if e.response else 'N/A'})", exc_info=True)
yield yield_message("error", {"message": f"Error fetching PDF from URL (HTTP {e.response.status_code if e.response else 'N/A'}): {e.response.reason if e.response else str(e)}"})
return
except requests.RequestException as e:
logger.error(f"URL fetch network error for PDF processing: {str(e)}", exc_info=True)
yield yield_message("error", {"message": f"Network error fetching PDF from URL: {str(e)}"})
return
else:
pdf_handle_for_text = pdf_input_source_path_or_url
total_text_pages = 0
try:
with pdfplumber.open(pdf_handle_for_text) as pdf:
total_text_pages = len(pdf.pages)
yield yield_message("status", {"message": f"Found {total_text_pages} page(s) for text extraction."})
time.sleep(0.01)
for i, page in enumerate(pdf.pages):
yield yield_message("status", {"message": f"Extracting text from page {i+1}/{total_text_pages}..."})
time.sleep(0.01)
page_text = page.extract_text(layout=True, x_density=1, y_density=1) or ""
# Removed table extraction logic here
# page_tables_md = "" # No longer needed
# tables = page.extract_tables() # No longer needed
# if tables: # No longer needed
# ... (table processing code removed) ...
formatted_page_text_md = format_page_text_to_markdown_chunk(page_text)
yield yield_message("markdown_chunk", {"content": formatted_page_text_md})
# if page_tables_md: # No longer needed, as page_tables_md is not created
# yield yield_message("markdown_chunk", {"content": page_tables_md})
time.sleep(0.01)
except Exception as e:
logger.error(f"Error during PDF text extraction: {str(e)}", exc_info=True) # Updated log message
yield yield_message("error", {"message": f"Error during text extraction: {str(e)}"})
if not check_poppler():
yield yield_message("error", {"message": "Poppler (for image extraction) not found or not working."})
else:
yield yield_message("status", {"message": "Starting image extraction..."})
yield yield_message("markdown_chunk", {"content": "## Extracted Images\n\n"})
if not HF_TOKEN:
yield yield_message("markdown_chunk", {"content": "**Note:** `HF_TOKEN` not set. Images will be described but not uploaded.\n\n"})
time.sleep(0.01)
extracted_pil_images_overall_count = 0 # Keep track of total images processed for numbering
try:
image_source_for_convert = None
if source_is_url and pdf_bytes_for_images:
image_source_for_convert = pdf_bytes_for_images
logger.info("Using downloaded bytes for image conversion.")
elif not source_is_url:
image_source_for_convert = pdf_input_source_path_or_url
logger.info("Using local file path for image conversion.")
if image_source_for_convert:
try:
pdf_info = None
if isinstance(image_source_for_convert, bytes):
pdf_info = pdf2image.pdfinfo_from_bytes(image_source_for_convert, userpw=None, poppler_path=None)
else:
pdf_info = pdf2image.pdfinfo_from_path(image_source_for_convert, userpw=None, poppler_path=None)
num_image_pages = pdf_info.get("Pages", 0)
yield yield_message("status", {"message": f"PDF has {num_image_pages} page(s) for potential image extraction."})
batch_size = 1
for page_idx_start in range(1, num_image_pages + 1, batch_size):
page_idx_end = min(page_idx_start + batch_size - 1, num_image_pages)
yield yield_message("status", {"message": f"Extracting images from PDF page(s) {page_idx_start}-{page_idx_end}..."})
time.sleep(0.01)
page_images_pil = []
if isinstance(image_source_for_convert, bytes):
page_images_pil = convert_from_bytes(image_source_for_convert, dpi=150, first_page=page_idx_start, last_page=page_idx_end)
else:
page_images_pil = convert_from_path(image_source_for_convert, dpi=150, first_page=page_idx_start, last_page=page_idx_end)
for img_idx_in_batch, img_pil in enumerate(page_images_pil):
extracted_pil_images_overall_count += 1
current_pdf_page_num = page_idx_start + img_idx_in_batch # Actual PDF page number
page_num_for_log = f"pdfpage_{current_pdf_page_num}"
yield yield_message("status", {"message": f"Processing image {extracted_pil_images_overall_count} (from PDF page {current_pdf_page_num}) (OCR & Upload)..."})
time.sleep(0.01)
ocr_text = ""
try:
ocr_text = pytesseract.image_to_string(img_pil).strip()
if ocr_text: yield yield_message("status", {"message": f" OCR successful for image {extracted_pil_images_overall_count}."})
except Exception as ocr_e:
logger.error(f"OCR error for image {extracted_pil_images_overall_count}: {str(ocr_e)}")
ocr_text = f"OCR failed: {str(ocr_e)}"
image_md_chunk = ""
if HF_TOKEN:
image_url_or_error = upload_image_to_hf_stream(img_pil, "pdf_image", page_num_for_log)
if isinstance(image_url_or_error, str) and not image_url_or_error.startswith("Error"):
image_md_chunk += f"![Image {extracted_pil_images_overall_count}]({image_url_or_error})\n"
yield yield_message("status", {"message": f" Image {extracted_pil_images_overall_count} uploaded."})
else:
image_md_chunk += f"**Image {extracted_pil_images_overall_count} (Upload Error):** {str(image_url_or_error)}\n\n"
yield yield_message("error", {"message": f"Failed to upload image {extracted_pil_images_overall_count}: {str(image_url_or_error)}"})
else:
image_md_chunk += f"**Image {extracted_pil_images_overall_count} (not uploaded due to missing HF_TOKEN)**\n"
if ocr_text:
image_md_chunk += f"**Image {extracted_pil_images_overall_count} OCR Text:**\n```\n{ocr_text}\n```\n\n"
yield yield_message("image_md", {"content": image_md_chunk})
time.sleep(0.01)
except Exception as e_img_info:
logger.error(f"Could not get PDF info for image batching or during batched conversion: {e_img_info}", exc_info=True)
yield yield_message("error", {"message": f"Error preparing for image extraction: {e_img_info}. Trying bulk."})
# Fallback to bulk conversion
bulk_images_pil = []
if isinstance(image_source_for_convert, bytes):
bulk_images_pil = convert_from_bytes(image_source_for_convert, dpi=150)
else:
bulk_images_pil = convert_from_path(image_source_for_convert, dpi=150)
yield yield_message("status", {"message": f"Fallback: Extracted {len(bulk_images_pil)} images in bulk."})
for i, img_pil in enumerate(bulk_images_pil):
extracted_pil_images_overall_count +=1
page_num_for_log = f"bulk_image_{i+1}" # Less precise page info in fallback
yield yield_message("status", {"message": f"Processing image {extracted_pil_images_overall_count} (bulk) (OCR & Upload)..."})
ocr_text = ""
try: ocr_text = pytesseract.image_to_string(img_pil).strip()
except Exception as e: ocr_text = f"OCR Error: {e}"
image_md_chunk = f"![Image {extracted_pil_images_overall_count} (Fallback)]\n"
if HF_TOKEN:
image_url_or_error = upload_image_to_hf_stream(img_pil, "pdf_image_fallback", page_num_for_log)
if isinstance(image_url_or_error, str) and not image_url_or_error.startswith("Error"):
image_md_chunk = f"![Image {extracted_pil_images_overall_count} (Fallback)]({image_url_or_error})\n"
else:
image_md_chunk += f"**Upload Error:** {str(image_url_or_error)}\n"
if ocr_text: image_md_chunk += f"**OCR Text:**\n```\n{ocr_text}\n```\n\n"
else: image_md_chunk += "\n"
yield yield_message("image_md", {"content": image_md_chunk})
time.sleep(0.01)
else:
yield yield_message("status", {"message": "No valid source for image extraction."})
except Exception as e:
logger.error(f"Error during image extraction/processing: {str(e)}", exc_info=True)
yield yield_message("error", {"message": f"Error during image extraction: {str(e)}"})
yield yield_message("final_status", {"message": "All processing stages complete."})
except Exception as e:
logger.error(f"Unhandled error in PDF conversion stream: {str(e)}", exc_info=True)
yield yield_message("error", {"message": f"Critical processing error: {str(e)}"})
# --- Flask Routes ---
@app.route('/', methods=['GET'])
def index():
return render_template('index.html')
@app.route('/process-stream', methods=['POST'])
def process_pdf_stream():
pdf_file = request.files.get('pdf_file')
pdf_url = request.form.get('pdf_url', '').strip()
outer_temp_pdf_path = None
def stream_processor():
nonlocal outer_temp_pdf_path
pdf_input_source_for_generator = None
try:
if pdf_file and pdf_file.filename:
if not pdf_file.filename.lower().endswith('.pdf'):
yield yield_message("error", {"message": "Uploaded file is not a PDF."})
return
filename = secure_filename(pdf_file.filename)
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
fd, temp_path = tempfile.mkstemp(suffix=".pdf", prefix="upload_", dir=app.config['UPLOAD_FOLDER'])
os.close(fd)
pdf_file.save(temp_path)
outer_temp_pdf_path = temp_path
logger.info(f"Uploaded PDF saved to temporary path: {outer_temp_pdf_path}")
pdf_input_source_for_generator = outer_temp_pdf_path
yield yield_message("status", {"message": f"Processing uploaded PDF: {filename}"})
time.sleep(0.01)
elif pdf_url:
unquoted_url = urllib.parse.unquote(pdf_url)
if not (unquoted_url.startswith('http://') or unquoted_url.startswith('https://')):
yield yield_message("error", {"message": "Invalid URL scheme. Must be http or https."})
return
pdf_input_source_for_generator = unquoted_url
yield yield_message("status", {"message": f"Preparing to process PDF from URL: {unquoted_url}"})
time.sleep(0.01)
else:
yield yield_message("error", {"message": "No PDF file uploaded and no PDF URL provided."})
return
for message_part in generate_pdf_conversion_stream(pdf_input_source_for_generator):
yield message_part
except Exception as e:
logger.error(f"Error setting up stream or in initial validation: {str(e)}", exc_info=True)
yield yield_message("error", {"message": f"Setup error: {str(e)}"})
finally:
if outer_temp_pdf_path and os.path.exists(outer_temp_pdf_path):
try:
os.remove(outer_temp_pdf_path)
logger.info(f"Cleaned up temporary PDF: {outer_temp_pdf_path}")
except OSError as ose:
logger.error(f"Error removing temporary PDF {outer_temp_pdf_path}: {ose}")
return Response(stream_with_context(stream_processor()), mimetype='application/x-ndjson')
# --- Main Execution ---
if __name__ == '__main__':
if not check_poppler():
logger.warning("Poppler utilities might not be installed correctly. PDF processing might fail.")
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
app.run(host='0.0.0.0', port=int(os.getenv("PORT", 7860)), debug=True, threaded=True)