|
import gradio as gr |
|
import pandas as pd |
|
import fitz |
|
import os |
|
import re |
|
from huggingface_hub import HfApi |
|
from huggingface_hub.utils import HfHubHTTPError |
|
import time |
|
|
|
def sanitize_filename(title): |
|
|
|
sanitized = re.sub(r'[\\/*?:"<>|]', "", title) |
|
return sanitized.replace(" ", "_") |
|
|
|
def extract_full_paper_with_labels(pdf_path, progress=None): |
|
print(f"π Starting PDF Processing: {os.path.basename(pdf_path)}") |
|
doc = fitz.open(pdf_path) |
|
content = "" |
|
|
|
|
|
title = "" |
|
authors = "" |
|
year = "" |
|
doi = "" |
|
abstract = "" |
|
footnotes = "" |
|
references = "" |
|
sources = "" |
|
total_pages = len(doc) |
|
max_iterations = total_pages * 2 |
|
iteration_count = 0 |
|
|
|
|
|
doi_pattern = r"\b10\.\d{4,9}/[-._;()/:A-Z0-9]+\b" |
|
year_pattern = r'\b(19|20)\d{2}\b' |
|
code_pattern = r"(def\s+\w+\s*\(|class\s+\w+|import\s+\w+|for\s+\w+\s+in|if\s+\w+|while\s+\w+|try:|except|{|\}|;)" |
|
reference_keywords = ['reference', 'bibliography', 'sources'] |
|
financial_keywords = ['p/e', 'volatility', 'market cap', 'roi', 'sharpe', 'drawdown'] |
|
|
|
for page_num, page in enumerate(doc): |
|
iteration_count += 1 |
|
if iteration_count > max_iterations: |
|
raise Exception("β οΈ PDF processing exceeded iteration limit. Possible malformed PDF.") |
|
|
|
if progress is not None: |
|
progress((page_num + 1) / total_pages, desc=f"Processing Page {page_num + 1}/{total_pages}") |
|
|
|
blocks = page.get_text("dict")["blocks"] |
|
for block in blocks: |
|
if "lines" in block: |
|
text = "" |
|
max_font_size = 0 |
|
for line in block["lines"]: |
|
for span in line["spans"]: |
|
text += span["text"] + " " |
|
if span["size"] > max_font_size: |
|
max_font_size = span["size"] |
|
|
|
text = text.strip() |
|
|
|
|
|
if page_num == 0 and max_font_size > 15 and not title: |
|
title = text |
|
content += f"<TITLE>{title}</TITLE>\n" |
|
|
|
|
|
elif re.search(r'author|by', text, re.IGNORECASE) and not authors: |
|
authors = text |
|
content += f"<AUTHORS>{authors}</AUTHORS>\n" |
|
|
|
|
|
elif re.search(year_pattern, text) and not year: |
|
year = re.search(year_pattern, text).group(0) |
|
content += f"<YEAR>{year}</YEAR>\n" |
|
|
|
|
|
elif re.search(doi_pattern, text) and not doi: |
|
doi = re.search(doi_pattern, text).group(0) |
|
content += f"<DOI>{doi}</DOI>\n" |
|
|
|
|
|
elif "abstract" in text.lower() and not abstract: |
|
abstract = text |
|
content += f"<ABSTRACT>{abstract}</ABSTRACT>\n" |
|
|
|
|
|
elif max_font_size < 10: |
|
footnotes += text + " " |
|
|
|
|
|
elif any(keyword in text.lower() for keyword in reference_keywords): |
|
references += text + " " |
|
|
|
|
|
elif re.search(r"table\s*\d+", text, re.IGNORECASE): |
|
content += f"<TABLE>{text}</TABLE>\n" |
|
|
|
|
|
elif re.search(r"figure\s*\d+", text, re.IGNORECASE): |
|
content += f"<FIGURE>{text}</FIGURE>\n" |
|
|
|
|
|
elif re.search(r"=|β|β|Β±|Γ|Ο|ΞΌ|Ο", text): |
|
content += f"<EQUATION>{text}</EQUATION>\n" |
|
|
|
|
|
elif re.search(code_pattern, text) and len(text.split()) <= 50: |
|
content += f"<CODE>{text}</CODE>\n" |
|
|
|
|
|
elif any(fin_kw in text.lower() for fin_kw in financial_keywords): |
|
content += f"<FINANCIAL_METRIC>{text}</FINANCIAL_METRIC>\n" |
|
|
|
|
|
else: |
|
content += f"<PARAGRAPH>{text}</PARAGRAPH>\n" |
|
|
|
|
|
if footnotes: |
|
content += f"<FOOTNOTE>{footnotes.strip()}</FOOTNOTE>\n" |
|
if references: |
|
content += f"<REFERENCE>{references.strip()}</REFERENCE>\n" |
|
|
|
print(f"β
Finished Processing PDF: {os.path.basename(pdf_path)}") |
|
return { |
|
"filename": os.path.basename(pdf_path), |
|
"title": title if title else "Untitled_Paper", |
|
"content": content |
|
} |
|
|
|
def pdf_to_parquet_and_upload(pdf_files, hf_token, dataset_repo_id, action_choice, progress=gr.Progress()): |
|
upload_message = "" |
|
|
|
total_files = len(pdf_files) |
|
print("π Starting PDF to Parquet Conversion Process") |
|
|
|
for idx, pdf_file in enumerate(pdf_files): |
|
if progress is not None: |
|
progress(idx / total_files, desc=f"Processing File {idx + 1}/{total_files}") |
|
|
|
|
|
extracted_data = extract_full_paper_with_labels(pdf_file.name, progress=progress) |
|
|
|
|
|
sanitized_title = sanitize_filename(extracted_data["title"]) |
|
parquet_file = f"{sanitized_title}.parquet" |
|
|
|
|
|
df = pd.DataFrame([extracted_data]) |
|
|
|
try: |
|
df.to_parquet(parquet_file, engine='pyarrow', index=False) |
|
print(f"β
Parquet saved as: {parquet_file}") |
|
except Exception as e: |
|
print(f"β Parquet Conversion Failed: {str(e)}") |
|
return None, f"β Parquet Conversion Failed: {str(e)}" |
|
|
|
|
|
if action_choice in ["Upload to Hugging Face", "Both"]: |
|
try: |
|
upload_message = upload_with_progress(parquet_file, dataset_repo_id, hf_token, progress) |
|
except Exception as e: |
|
print(f"β Upload Failed: {str(e)}") |
|
upload_message = f"β Upload failed: {str(e)}" |
|
|
|
print("π Process Completed") |
|
return parquet_file, upload_message |
|
|
|
|
|
iface = gr.Interface( |
|
fn=pdf_to_parquet_and_upload, |
|
inputs=[ |
|
gr.File(file_types=[".pdf"], file_count="multiple", label="Upload PDFs (Drag & Drop or Search)"), |
|
gr.Textbox(label="Hugging Face API Token", type="password", placeholder="Enter your Hugging Face API token"), |
|
gr.Textbox(label="Your Dataset Repo ID (e.g., username/research-dataset)", placeholder="username/research-dataset"), |
|
gr.Radio(["Download Locally", "Upload to Hugging Face", "Both"], label="Action", value="Download Locally") |
|
], |
|
outputs=[ |
|
gr.File(label="Download Parquet File"), |
|
gr.Textbox(label="Status") |
|
], |
|
title="PDF to Parquet Converter with Title-Based Naming", |
|
description="Upload your PDFs, convert them to Parquet files named after the paper title, and upload to your Hugging Face Dataset." |
|
) |
|
|
|
iface.launch() |
|
|
|
|
|
|