Jobey1's picture
Update app.py
9323459 verified
raw
history blame
7.27 kB
import gradio as gr
import pandas as pd
import fitz # PyMuPDF
import os
import re
from huggingface_hub import HfApi
from huggingface_hub.utils import HfHubHTTPError
import time
def sanitize_filename(title):
# Remove invalid characters and replace spaces with underscores
sanitized = re.sub(r'[\\/*?:"<>|]', "", title)
return sanitized.replace(" ", "_")
def extract_full_paper_with_labels(pdf_path, progress=None):
print(f"πŸ“„ Starting PDF Processing: {os.path.basename(pdf_path)}")
doc = fitz.open(pdf_path)
content = ""
# Initialize metadata
title = ""
authors = ""
year = ""
doi = ""
abstract = ""
footnotes = ""
references = ""
sources = ""
total_pages = len(doc)
max_iterations = total_pages * 2 # To prevent infinite loops
iteration_count = 0
# Regex patterns for detection
doi_pattern = r"\b10\.\d{4,9}/[-._;()/:A-Z0-9]+\b"
year_pattern = r'\b(19|20)\d{2}\b'
code_pattern = r"(def\s+\w+\s*\(|class\s+\w+|import\s+\w+|for\s+\w+\s+in|if\s+\w+|while\s+\w+|try:|except|{|\}|;)"
reference_keywords = ['reference', 'bibliography', 'sources']
financial_keywords = ['p/e', 'volatility', 'market cap', 'roi', 'sharpe', 'drawdown']
for page_num, page in enumerate(doc):
iteration_count += 1
if iteration_count > max_iterations:
raise Exception("⚠️ PDF processing exceeded iteration limit. Possible malformed PDF.")
if progress is not None:
progress((page_num + 1) / total_pages, desc=f"Processing Page {page_num + 1}/{total_pages}")
blocks = page.get_text("dict")["blocks"]
for block in blocks:
if "lines" in block:
text = ""
max_font_size = 0
for line in block["lines"]:
for span in line["spans"]:
text += span["text"] + " "
if span["size"] > max_font_size:
max_font_size = span["size"]
text = text.strip()
# Title (First Page, Largest Font)
if page_num == 0 and max_font_size > 15 and not title:
title = text
content += f"<TITLE>{title}</TITLE>\n"
# Authors
elif re.search(r'author|by', text, re.IGNORECASE) and not authors:
authors = text
content += f"<AUTHORS>{authors}</AUTHORS>\n"
# Year
elif re.search(year_pattern, text) and not year:
year = re.search(year_pattern, text).group(0)
content += f"<YEAR>{year}</YEAR>\n"
# DOI
elif re.search(doi_pattern, text) and not doi:
doi = re.search(doi_pattern, text).group(0)
content += f"<DOI>{doi}</DOI>\n"
# Abstract
elif "abstract" in text.lower() and not abstract:
abstract = text
content += f"<ABSTRACT>{abstract}</ABSTRACT>\n"
# Footnotes (small fonts)
elif max_font_size < 10:
footnotes += text + " "
# References
elif any(keyword in text.lower() for keyword in reference_keywords):
references += text + " "
# Tables
elif re.search(r"table\s*\d+", text, re.IGNORECASE):
content += f"<TABLE>{text}</TABLE>\n"
# Figures
elif re.search(r"figure\s*\d+", text, re.IGNORECASE):
content += f"<FIGURE>{text}</FIGURE>\n"
# Equations (look for math symbols)
elif re.search(r"=|βˆ‘|√|Β±|Γ—|Ο€|ΞΌ|Οƒ", text):
content += f"<EQUATION>{text}</EQUATION>\n"
# Code Blocks (enhanced detection)
elif re.search(code_pattern, text) and len(text.split()) <= 50:
content += f"<CODE>{text}</CODE>\n"
# Financial Metrics
elif any(fin_kw in text.lower() for fin_kw in financial_keywords):
content += f"<FINANCIAL_METRIC>{text}</FINANCIAL_METRIC>\n"
# Regular Paragraph
else:
content += f"<PARAGRAPH>{text}</PARAGRAPH>\n"
# Append Footnotes and References
if footnotes:
content += f"<FOOTNOTE>{footnotes.strip()}</FOOTNOTE>\n"
if references:
content += f"<REFERENCE>{references.strip()}</REFERENCE>\n"
print(f"βœ… Finished Processing PDF: {os.path.basename(pdf_path)}")
return {
"filename": os.path.basename(pdf_path),
"title": title if title else "Untitled_Paper",
"content": content
}
def pdf_to_parquet_and_upload(pdf_files, hf_token, dataset_repo_id, action_choice, progress=gr.Progress()):
upload_message = ""
total_files = len(pdf_files)
print("πŸš€ Starting PDF to Parquet Conversion Process")
for idx, pdf_file in enumerate(pdf_files):
if progress is not None:
progress(idx / total_files, desc=f"Processing File {idx + 1}/{total_files}")
# βœ… Step 1: Process PDF with Full Labels
extracted_data = extract_full_paper_with_labels(pdf_file.name, progress=progress)
# βœ… Step 2: Use Title for Parquet Filename
sanitized_title = sanitize_filename(extracted_data["title"])
parquet_file = f"{sanitized_title}.parquet"
# Convert to DataFrame
df = pd.DataFrame([extracted_data])
try:
df.to_parquet(parquet_file, engine='pyarrow', index=False)
print(f"βœ… Parquet saved as: {parquet_file}")
except Exception as e:
print(f"❌ Parquet Conversion Failed: {str(e)}")
return None, f"❌ Parquet Conversion Failed: {str(e)}"
# βœ… Step 3: Upload Parquet (if selected)
if action_choice in ["Upload to Hugging Face", "Both"]:
try:
upload_message = upload_with_progress(parquet_file, dataset_repo_id, hf_token, progress)
except Exception as e:
print(f"❌ Upload Failed: {str(e)}")
upload_message = f"❌ Upload failed: {str(e)}"
print("🏁 Process Completed")
return parquet_file, upload_message
# βœ… Gradio Interface
iface = gr.Interface(
fn=pdf_to_parquet_and_upload,
inputs=[
gr.File(file_types=[".pdf"], file_count="multiple", label="Upload PDFs (Drag & Drop or Search)"),
gr.Textbox(label="Hugging Face API Token", type="password", placeholder="Enter your Hugging Face API token"),
gr.Textbox(label="Your Dataset Repo ID (e.g., username/research-dataset)", placeholder="username/research-dataset"),
gr.Radio(["Download Locally", "Upload to Hugging Face", "Both"], label="Action", value="Download Locally")
],
outputs=[
gr.File(label="Download Parquet File"),
gr.Textbox(label="Status")
],
title="PDF to Parquet Converter with Title-Based Naming",
description="Upload your PDFs, convert them to Parquet files named after the paper title, and upload to your Hugging Face Dataset."
)
iface.launch()