Jobey1's picture
Update app.py
43a7a2a verified
raw
history blame
7.17 kB
import gradio as gr
import pandas as pd
import fitz # PyMuPDF
import os
import re
from huggingface_hub import HfApi
from huggingface_hub.utils import HfHubHTTPError
import time
def extract_full_paper_with_labels(pdf_path, progress=None):
print(f"📝 Starting PDF Processing: {os.path.basename(pdf_path)}")
doc = fitz.open(pdf_path)
content = ""
# Initialize metadata
title = ""
authors = ""
year = ""
doi = ""
abstract = ""
footnotes = ""
references = ""
sources = ""
total_pages = len(doc)
max_iterations = total_pages * 2 # To prevent infinite loops
iteration_count = 0
# Regex patterns for detection
doi_pattern = r"\b10\.\d{4,9}/[-._;()/:A-Z0-9]+\b"
year_pattern = r'\b(19|20)\d{2}\b'
code_pattern = r"(def\s+\w+\s*\(|class\s+\w+|import\s+\w+|for\s+\w+\s+in|if\s+\w+|while\s+\w+|try:|except|{|}|;)"
reference_keywords = ['reference', 'bibliography', 'sources']
financial_keywords = ['p/e', 'volatility', 'market cap', 'roi', 'sharpe', 'drawdown']
for page_num, page in enumerate(doc):
iteration_count += 1
if iteration_count > max_iterations:
raise Exception("⚠️ PDF processing exceeded iteration limit. Possible malformed PDF.")
if progress is not None:
progress((page_num + 1) / total_pages, desc=f"Processing Page {page_num + 1}/{total_pages}")
blocks = page.get_text("dict")["blocks"]
for block in blocks:
if "lines" in block:
text = ""
max_font_size = 0
for line in block["lines"]:
for span in line["spans"]:
text += span["text"] + " "
if span["size"] > max_font_size:
max_font_size = span["size"]
text = text.strip()
# Title (First Page, Largest Font)
if page_num == 0 and max_font_size > 15 and not title:
title = text
content += f"<TITLE>{title}</TITLE>\n"
# Authors
elif re.search(r'author|by', text, re.IGNORECASE) and not authors:
authors = text
content += f"<AUTHORS>{authors}</AUTHORS>\n"
# Year
elif re.search(year_pattern, text) and not year:
year = re.search(year_pattern, text).group(0)
content += f"<YEAR>{year}</YEAR>\n"
# DOI
elif re.search(doi_pattern, text) and not doi:
doi = re.search(doi_pattern, text).group(0)
content += f"<DOI>{doi}</DOI>\n"
# Abstract
elif "abstract" in text.lower() and not abstract:
abstract = text
content += f"<ABSTRACT>{abstract}</ABSTRACT>\n"
# Footnotes (small fonts)
elif max_font_size < 10:
footnotes += text + " "
# References
elif any(keyword in text.lower() for keyword in reference_keywords):
references += text + " "
# Tables
elif re.search(r"table\s*\d+", text, re.IGNORECASE):
content += f"<TABLE>{text}</TABLE>\n"
# Figures
elif re.search(r"figure\s*\d+", text, re.IGNORECASE):
content += f"<FIGURE>{text}</FIGURE>\n"
# Equations (look for math symbols)
elif re.search(r"=|∑|√|±|×|π|μ|σ", text):
content += f"<EQUATION>{text}</EQUATION>\n"
# ✅ Improved Code Block Detection
elif re.search(code_pattern, text) and len(text.split()) <= 50:
content += f"<CODE>{text}</CODE>\n"
# Financial Metrics
elif any(fin_kw in text.lower() for fin_kw in financial_keywords):
content += f"<FINANCIAL_METRIC>{text}</FINANCIAL_METRIC>\n"
# Regular Paragraph
else:
content += f"<PARAGRAPH>{text}</PARAGRAPH>\n"
# Append Footnotes and References
if footnotes:
content += f"<FOOTNOTE>{footnotes.strip()}</FOOTNOTE>\n"
if references:
content += f"<REFERENCE>{references.strip()}</REFERENCE>\n"
print(f"✅ Finished Processing PDF: {os.path.basename(pdf_path)}")
return {
"filename": os.path.basename(pdf_path),
"content": content
}
def process_pdf_file(pdf_file, api_key, repo_address):
if pdf_file is None:
return None, "No PDF file uploaded."
# Extract content from PDF.
# pdf_file can be a file-like object or a dict depending on how Gradio returns it.
file_path = pdf_file.name if hasattr(pdf_file, "name") else pdf_file['name']
result = extract_full_paper_with_labels(file_path)
# Convert the result dictionary into a DataFrame and write it to a parquet file.
df = pd.DataFrame([result])
base = os.path.splitext(result['filename'])[0]
parquet_filename = f"{base}.parquet"
df.to_parquet(parquet_filename, index=False)
repo_status = ""
# If API key and repo address are provided, attempt to upload the parquet file.
if api_key and repo_address:
api = HfApi()
try:
api.upload_file(
path_or_fileobj=parquet_filename,
path_in_repo=parquet_filename,
repo_id=repo_address,
token=api_key
)
repo_status = f"File uploaded to repo {repo_address} successfully."
except Exception as e:
repo_status = f"Failed to upload to repo: {str(e)}"
else:
repo_status = "API key or repo address not provided, skipping repo upload."
# Return the parquet file for local download and the status message.
return parquet_filename, repo_status
# Function to clear only file-related inputs/outputs, preserving the API key and repo address.
def clear_files():
return None, None, ""
# Gradio interface setup
with gr.Blocks() as demo:
with gr.Row():
api_key_input = gr.Textbox(label="API Key", placeholder="Enter API Key")
repo_address_input = gr.Textbox(label="Repo Address", placeholder="Enter Repo Address")
with gr.Row():
pdf_file_input = gr.File(label="Upload PDF")
convert_button = gr.Button("Convert to Parquet")
clear_button = gr.Button("Clear Files")
with gr.Row():
download_file_output = gr.File(label="Download Parquet File")
repo_status_output = gr.Textbox(label="Repo Upload Status")
convert_button.click(
process_pdf_file,
inputs=[pdf_file_input, api_key_input, repo_address_input],
outputs=[download_file_output, repo_status_output]
)
# The clear button now only clears file-related components; API key and Repo Address remain unchanged.
clear_button.click(
clear_files,
inputs=None,
outputs=[pdf_file_input, download_file_output, repo_status_output]
)
demo.launch()