|
import gradio as gr |
|
import pandas as pd |
|
import fitz |
|
import os |
|
import re |
|
from huggingface_hub import HfApi |
|
from huggingface_hub.utils import HfHubHTTPError |
|
import time |
|
|
|
def extract_full_paper_with_labels(pdf_path, progress=None): |
|
print(f"📝 Starting PDF Processing: {os.path.basename(pdf_path)}") |
|
doc = fitz.open(pdf_path) |
|
content = "" |
|
|
|
|
|
title = "" |
|
authors = "" |
|
year = "" |
|
doi = "" |
|
abstract = "" |
|
footnotes = "" |
|
references = "" |
|
sources = "" |
|
total_pages = len(doc) |
|
max_iterations = total_pages * 2 |
|
iteration_count = 0 |
|
|
|
|
|
doi_pattern = r"\b10\.\d{4,9}/[-._;()/:A-Z0-9]+\b" |
|
year_pattern = r'\b(19|20)\d{2}\b' |
|
code_pattern = r"(def\s+\w+\s*\(|class\s+\w+|import\s+\w+|for\s+\w+\s+in|if\s+\w+|while\s+\w+|try:|except|{|}|;)" |
|
reference_keywords = ['reference', 'bibliography', 'sources'] |
|
financial_keywords = ['p/e', 'volatility', 'market cap', 'roi', 'sharpe', 'drawdown'] |
|
|
|
for page_num, page in enumerate(doc): |
|
iteration_count += 1 |
|
if iteration_count > max_iterations: |
|
raise Exception("⚠️ PDF processing exceeded iteration limit. Possible malformed PDF.") |
|
|
|
if progress is not None: |
|
progress((page_num + 1) / total_pages, desc=f"Processing Page {page_num + 1}/{total_pages}") |
|
|
|
blocks = page.get_text("dict")["blocks"] |
|
for block in blocks: |
|
if "lines" in block: |
|
text = "" |
|
max_font_size = 0 |
|
for line in block["lines"]: |
|
for span in line["spans"]: |
|
text += span["text"] + " " |
|
if span["size"] > max_font_size: |
|
max_font_size = span["size"] |
|
|
|
text = text.strip() |
|
|
|
|
|
if page_num == 0 and max_font_size > 15 and not title: |
|
title = text |
|
content += f"<TITLE>{title}</TITLE>\n" |
|
|
|
|
|
elif re.search(r'author|by', text, re.IGNORECASE) and not authors: |
|
authors = text |
|
content += f"<AUTHORS>{authors}</AUTHORS>\n" |
|
|
|
|
|
elif re.search(year_pattern, text) and not year: |
|
year = re.search(year_pattern, text).group(0) |
|
content += f"<YEAR>{year}</YEAR>\n" |
|
|
|
|
|
elif re.search(doi_pattern, text) and not doi: |
|
doi = re.search(doi_pattern, text).group(0) |
|
content += f"<DOI>{doi}</DOI>\n" |
|
|
|
|
|
elif "abstract" in text.lower() and not abstract: |
|
abstract = text |
|
content += f"<ABSTRACT>{abstract}</ABSTRACT>\n" |
|
|
|
|
|
elif max_font_size < 10: |
|
footnotes += text + " " |
|
|
|
|
|
elif any(keyword in text.lower() for keyword in reference_keywords): |
|
references += text + " " |
|
|
|
|
|
elif re.search(r"table\s*\d+", text, re.IGNORECASE): |
|
content += f"<TABLE>{text}</TABLE>\n" |
|
|
|
|
|
elif re.search(r"figure\s*\d+", text, re.IGNORECASE): |
|
content += f"<FIGURE>{text}</FIGURE>\n" |
|
|
|
|
|
elif re.search(r"=|∑|√|±|×|π|μ|σ", text): |
|
content += f"<EQUATION>{text}</EQUATION>\n" |
|
|
|
|
|
elif re.search(code_pattern, text) and len(text.split()) <= 50: |
|
content += f"<CODE>{text}</CODE>\n" |
|
|
|
|
|
elif any(fin_kw in text.lower() for fin_kw in financial_keywords): |
|
content += f"<FINANCIAL_METRIC>{text}</FINANCIAL_METRIC>\n" |
|
|
|
|
|
else: |
|
content += f"<PARAGRAPH>{text}</PARAGRAPH>\n" |
|
|
|
|
|
if footnotes: |
|
content += f"<FOOTNOTE>{footnotes.strip()}</FOOTNOTE>\n" |
|
if references: |
|
content += f"<REFERENCE>{references.strip()}</REFERENCE>\n" |
|
|
|
print(f"✅ Finished Processing PDF: {os.path.basename(pdf_path)}") |
|
return { |
|
"filename": os.path.basename(pdf_path), |
|
"content": content |
|
} |
|
|
|
def process_pdf_file(pdf_file, api_key, repo_address): |
|
if pdf_file is None: |
|
return None, "No PDF file uploaded." |
|
|
|
|
|
file_path = pdf_file.name if hasattr(pdf_file, "name") else pdf_file['name'] |
|
result = extract_full_paper_with_labels(file_path) |
|
|
|
|
|
df = pd.DataFrame([result]) |
|
base = os.path.splitext(result['filename'])[0] |
|
parquet_filename = f"{base}.parquet" |
|
df.to_parquet(parquet_filename, index=False) |
|
|
|
repo_status = "" |
|
|
|
if api_key and repo_address: |
|
api = HfApi() |
|
try: |
|
api.upload_file( |
|
path_or_fileobj=parquet_filename, |
|
path_in_repo=parquet_filename, |
|
repo_id=repo_address, |
|
token=api_key |
|
) |
|
repo_status = f"File uploaded to repo {repo_address} successfully." |
|
except Exception as e: |
|
repo_status = f"Failed to upload to repo: {str(e)}" |
|
else: |
|
repo_status = "API key or repo address not provided, skipping repo upload." |
|
|
|
|
|
return parquet_filename, repo_status |
|
|
|
|
|
def clear_files(): |
|
return None, None, "" |
|
|
|
|
|
with gr.Blocks() as demo: |
|
with gr.Row(): |
|
api_key_input = gr.Textbox(label="API Key", placeholder="Enter API Key") |
|
repo_address_input = gr.Textbox(label="Repo Address", placeholder="Enter Repo Address") |
|
with gr.Row(): |
|
pdf_file_input = gr.File(label="Upload PDF") |
|
convert_button = gr.Button("Convert to Parquet") |
|
clear_button = gr.Button("Clear Files") |
|
with gr.Row(): |
|
download_file_output = gr.File(label="Download Parquet File") |
|
repo_status_output = gr.Textbox(label="Repo Upload Status") |
|
|
|
convert_button.click( |
|
process_pdf_file, |
|
inputs=[pdf_file_input, api_key_input, repo_address_input], |
|
outputs=[download_file_output, repo_status_output] |
|
) |
|
|
|
clear_button.click( |
|
clear_files, |
|
inputs=None, |
|
outputs=[pdf_file_input, download_file_output, repo_status_output] |
|
) |
|
|
|
demo.launch() |
|
|