|
import gradio as gr |
|
import pandas as pd |
|
import fitz |
|
import os |
|
import re |
|
from huggingface_hub import HfApi |
|
from huggingface_hub.utils import HfHubHTTPError |
|
import time |
|
|
|
def sanitize_title(title, max_length=100): |
|
""" |
|
Sanitize the paper title to be safe for use as a filename. |
|
Removes non-alphanumeric characters (except underscores and hyphens) |
|
and truncates to max_length characters. |
|
""" |
|
sanitized = re.sub(r'[^\w\s-]', '', title).strip() |
|
sanitized = re.sub(r'[-\s]+', '_', sanitized) |
|
if len(sanitized) > max_length: |
|
sanitized = sanitized[:max_length] |
|
return sanitized |
|
|
|
def extract_full_paper_with_labels(pdf_path, progress=None): |
|
print(f"π Starting PDF Processing: {os.path.basename(pdf_path)}") |
|
doc = fitz.open(pdf_path) |
|
content = "" |
|
|
|
|
|
title = "" |
|
authors = "" |
|
year = "" |
|
doi = "" |
|
abstract = "" |
|
footnotes = "" |
|
references = "" |
|
sources = "" |
|
total_pages = len(doc) |
|
max_iterations = total_pages * 2 |
|
iteration_count = 0 |
|
|
|
|
|
doi_pattern = r"\b10\.\d{4,9}/[-._;()/:A-Z0-9]+\b" |
|
year_pattern = r'\b(19|20)\d{2}\b' |
|
code_pattern = r"(def\s+\w+\s*\(|class\s+\w+|import\s+\w+|for\s+\w+\s+in|if\s+\w+|while\s+\w+|try:|except|{|\}|;)" |
|
reference_keywords = ['reference', 'bibliography', 'sources'] |
|
financial_keywords = ['p/e', 'volatility', 'market cap', 'roi', 'sharpe', 'drawdown'] |
|
|
|
for page_num, page in enumerate(doc): |
|
iteration_count += 1 |
|
if iteration_count > max_iterations: |
|
raise Exception("β οΈ PDF processing exceeded iteration limit. Possible malformed PDF.") |
|
|
|
if progress is not None: |
|
progress((page_num + 1) / total_pages, desc=f"Processing Page {page_num + 1}/{total_pages}") |
|
|
|
blocks = page.get_text("dict")["blocks"] |
|
for block in blocks: |
|
if "lines" in block: |
|
text = "" |
|
max_font_size = 0 |
|
for line in block["lines"]: |
|
for span in line["spans"]: |
|
text += span["text"] + " " |
|
if span["size"] > max_font_size: |
|
max_font_size = span["size"] |
|
|
|
text = text.strip() |
|
|
|
|
|
if page_num == 0 and max_font_size > 15 and not title: |
|
title = text |
|
content += f"<TITLE>{title}</TITLE>\n" |
|
|
|
|
|
elif re.search(r'author|by', text, re.IGNORECASE) and not authors: |
|
authors = text |
|
content += f"<AUTHORS>{authors}</AUTHORS>\n" |
|
|
|
|
|
elif re.search(year_pattern, text) and not year: |
|
year = re.search(year_pattern, text).group(0) |
|
content += f"<YEAR>{year}</YEAR>\n" |
|
|
|
|
|
elif re.search(doi_pattern, text) and not doi: |
|
doi = re.search(doi_pattern, text).group(0) |
|
content += f"<DOI>{doi}</DOI>\n" |
|
|
|
|
|
elif "abstract" in text.lower() and not abstract: |
|
abstract = text |
|
content += f"<ABSTRACT>{abstract}</ABSTRACT>\n" |
|
|
|
|
|
elif max_font_size < 10: |
|
footnotes += text + " " |
|
|
|
|
|
elif any(keyword in text.lower() for keyword in reference_keywords): |
|
references += text + " " |
|
|
|
|
|
elif re.search(r"table\s*\d+", text, re.IGNORECASE): |
|
content += f"<TABLE>{text}</TABLE>\n" |
|
|
|
|
|
elif re.search(r"figure\s*\d+", text, re.IGNORECASE): |
|
content += f"<FIGURE>{text}</FIGURE>\n" |
|
|
|
|
|
elif re.search(r"=|β|β|Β±|Γ|Ο|ΞΌ|Ο", text): |
|
content += f"<EQUATION>{text}</EQUATION>\n" |
|
|
|
|
|
elif re.search(code_pattern, text) and len(text.split()) <= 50: |
|
content += f"<CODE>{text}</CODE>\n" |
|
|
|
|
|
elif any(fin_kw in text.lower() for fin_kw in financial_keywords): |
|
content += f"<FINANCIAL_METRIC>{text}</FINANCIAL_METRIC>\n" |
|
|
|
|
|
else: |
|
content += f"<PARAGRAPH>{text}</PARAGRAPH>\n" |
|
|
|
|
|
if footnotes: |
|
content += f"<FOOTNOTE>{footnotes.strip()}</FOOTNOTE>\n" |
|
if references: |
|
content += f"<REFERENCE>{references.strip()}</REFERENCE>\n" |
|
|
|
print(f"β
Finished Processing PDF: {os.path.basename(pdf_path)}") |
|
return { |
|
"filename": os.path.basename(pdf_path), |
|
"title": title, |
|
"content": content |
|
} |
|
|
|
def upload_with_progress(file_path, repo_id, token, progress): |
|
""" |
|
Upload file to Hugging Face Dataset using upload_file() API method. |
|
""" |
|
print(f"π€ Starting upload of Parquet: {file_path}") |
|
file_size = os.path.getsize(file_path) |
|
|
|
api = HfApi() |
|
|
|
try: |
|
|
|
api.upload_file( |
|
path_or_fileobj=file_path, |
|
path_in_repo=os.path.basename(file_path), |
|
repo_id=repo_id, |
|
repo_type="dataset", |
|
token=token |
|
) |
|
|
|
if progress is not None: |
|
progress(1, desc="β
Upload Complete") |
|
|
|
print(f"β
Successfully uploaded to {repo_id}") |
|
return f"β
Successfully uploaded to {repo_id}" |
|
|
|
except HfHubHTTPError as e: |
|
print(f"β Upload failed: {e}") |
|
return f"β Upload failed: {str(e)}" |
|
except Exception as e: |
|
print(f"β Unexpected error: {e}") |
|
return f"β Unexpected error: {str(e)}" |
|
|
|
def pdf_to_parquet_and_upload(pdf_files, hf_token, dataset_repo_id, action_choice, progress=gr.Progress()): |
|
all_data = [] |
|
|
|
total_files = len(pdf_files) |
|
print("π Starting PDF to Parquet Conversion Process") |
|
|
|
for idx, pdf_file in enumerate(pdf_files): |
|
if progress is not None: |
|
progress(idx / total_files, desc=f"Processing File {idx + 1}/{total_files}") |
|
|
|
|
|
extracted_data = extract_full_paper_with_labels(pdf_file.name, progress=progress) |
|
all_data.append(extracted_data) |
|
|
|
print("π‘ Converting Processed Data to Parquet") |
|
|
|
df = pd.DataFrame(all_data) |
|
|
|
|
|
if len(all_data) == 1: |
|
paper_title = all_data[0].get("title", "").strip() |
|
if paper_title: |
|
safe_title = sanitize_title(paper_title) |
|
parquet_file = f"{safe_title}.parquet" |
|
else: |
|
parquet_file = 'fully_labeled_papers.parquet' |
|
else: |
|
|
|
parquet_file = f"fully_labeled_papers_{time.strftime('%Y%m%d_%H%M%S')}.parquet" |
|
|
|
try: |
|
df.to_parquet(parquet_file, engine='pyarrow', index=False) |
|
print("β
Parquet Conversion Completed") |
|
except Exception as e: |
|
print(f"β Parquet Conversion Failed: {str(e)}") |
|
return None, f"β Parquet Conversion Failed: {str(e)}" |
|
|
|
upload_message = "Skipped Upload" |
|
|
|
|
|
if action_choice in ["Upload to Hugging Face", "Both"]: |
|
try: |
|
upload_message = upload_with_progress(parquet_file, dataset_repo_id, hf_token, progress) |
|
except Exception as e: |
|
print(f"β Upload Failed: {str(e)}") |
|
upload_message = f"β Upload failed: {str(e)}" |
|
|
|
print("π Process Completed") |
|
return parquet_file, upload_message |
|
|
|
|
|
def reset_files_fn(): |
|
|
|
return None, None |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown( |
|
""" |
|
# PDF to Parquet Converter with Full Labeling |
|
|
|
**Clear All Inputs:** The button below (labeled "Clear All Inputs") will reset every field, including your API key and dataset repo ID. |
|
**Reset Files Only:** Use this button if you want to clear the PDF file uploads and the generated Parquet file, while keeping your credentials intact. |
|
""" |
|
) |
|
|
|
with gr.Row(): |
|
pdf_input = gr.File(file_types=[".pdf"], file_count="multiple", label="Upload PDFs (Drag & Drop or Search)") |
|
with gr.Row(): |
|
hf_token = gr.Textbox(label="Hugging Face API Token", type="password", placeholder="Enter your Hugging Face API token") |
|
dataset_repo = gr.Textbox(label="Your Dataset Repo ID (e.g., username/research-dataset)", placeholder="username/research-dataset") |
|
with gr.Row(): |
|
action_radio = gr.Radio(["Download Locally", "Upload to Hugging Face", "Both"], label="Action", value="Download Locally") |
|
|
|
with gr.Row(): |
|
convert_button = gr.Button("Convert PDF to Parquet") |
|
reset_files_button = gr.Button("Reset Files Only") |
|
clear_all_button = gr.Button("Clear All Inputs") |
|
|
|
with gr.Row(): |
|
output_file = gr.File(label="Download Parquet File") |
|
status_text = gr.Textbox(label="Status") |
|
|
|
convert_button.click( |
|
fn=pdf_to_parquet_and_upload, |
|
inputs=[pdf_input, hf_token, dataset_repo, action_radio], |
|
outputs=[output_file, status_text] |
|
) |
|
|
|
reset_files_button.click( |
|
fn=reset_files_fn, |
|
inputs=None, |
|
outputs=[pdf_input, output_file] |
|
) |
|
|
|
|
|
def clear_all_fn(): |
|
return None, None, None, "Download Locally" |
|
|
|
clear_all_button.click( |
|
fn=clear_all_fn, |
|
inputs=None, |
|
outputs=[pdf_input, hf_token, dataset_repo, action_radio] |
|
) |
|
|
|
demo.launch() |
|
|