rbiswasfc's picture
app
43bc104
raw
history blame
14.3 kB
import os
import re
import time
import dotenv
import pandas as pd
import requests
import schedule
import srsly
from bs4 import BeautifulSoup
from datasets import Dataset, load_dataset
from huggingface_hub import create_repo, login, whoami
from retry import retry
from tqdm.auto import tqdm
dotenv.load_dotenv()
login(token=os.environ.get("HF_TOKEN"))
hf_user = whoami(os.environ.get("HF_TOKEN"))["name"]
HF_REPO_ID = f"{hf_user}/zotero-answer-ai-articles"
########################################################
### GET ZOTERO ITEMS
########################################################
@retry(tries=3, delay=8)
def _fetch_one_zotero_batch(url, headers, params):
"""
Fetch articles from Zotero API
"""
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
def get_zotero_items(debug=False):
"""
fetch items from zotero library
"""
GROUP_ID = os.getenv("GROUP_ID")
API_KEY = os.getenv("API_KEY")
BASE_URL = f"https://api.zotero.org/groups/{GROUP_ID}/items"
LIMIT = 100
headers = {"Zotero-API-Key": API_KEY, "Content-Type": "application/json"}
items = []
start = 0
i = 1
while True:
i += 1
params = {"limit": LIMIT, "start": start}
page_items = _fetch_one_zotero_batch(BASE_URL, headers, params)
if not page_items:
break
items.extend(page_items)
start += LIMIT
print(f"# items fetched {len(items)}")
if debug:
if len(items) > 300:
break
return items
########################################################
### EXTRACT ARXIV LINKS AND PDFs
########################################################
def get_arxiv_items(items):
visited = set()
arxiv_items = []
arxiv_pattern = re.compile(r"arxiv.org/abs/(\d+\.\d+)")
for item in items:
data = item.get("data", {})
attachments = item.get("links", {}).get("attachment", {})
arxiv_url = None
pdf_url = None
if "url" in data and "arxiv.org" in data["url"]:
arxiv_match = arxiv_pattern.search(data["url"])
if arxiv_match:
arxiv_url = data["url"]
if attachments:
pdf_url = attachments["href"]
if arxiv_url:
arxiv_id = arxiv_url.split("/")[-1]
if arxiv_id in visited:
continue
arxiv_items.append(
{
"arxiv_id": arxiv_id,
"arxiv_url": arxiv_url,
"pdf_url": pdf_url,
"added_by": item["meta"]["createdByUser"]["username"],
"date_added": data.get("dateAdded", ""),
}
)
visited.add(arxiv_id)
return arxiv_items
@retry(tries=3, delay=15, backoff=2)
def fetch_arxiv_html(arxiv_id):
url = f"https://ar5iv.labs.arxiv.org/html/{arxiv_id.split('v')[0]}"
response = requests.get(url)
return response.text if response.status_code == 200 else None
def fetch_arxiv_htmls(arxiv_items):
for item in tqdm(arxiv_items):
html = fetch_arxiv_html(item["arxiv_id"])
if html:
item["raw_html"] = html
else:
print(f"failed to fetch html for {item['arxiv_id']}")
item["raw_html"] = "Error"
return arxiv_items
########################################################
### PARSE CONTENT FROM ARXIV HTML #
########################################################
def parse_html_content(html):
"""
Parse content from arxiv html
"""
arxiv_id_match = re.search(r"\[(\d+\.\d+(v\d+)?)\]", html)
arxiv_id = arxiv_id_match.group(1) if arxiv_id_match else None
soup = BeautifulSoup(html, "html.parser")
result = []
# Extract paper title
try:
paper_title = soup.find("h1", class_="ltx_title ltx_title_document").get_text(strip=True)
except Exception:
paper_title = soup.find("title").get_text(strip=True)
paper_title = re.sub(r"^\[\d+\.\d+(v\d+)?\]\s*", "", paper_title)
for math in soup.find_all("math"):
math.decompose()
for cite in soup.find_all("cite"):
cite.decompose()
# Extract abstract
abstract = soup.find("div", class_="ltx_abstract")
if abstract:
result.append(
{
"content": " ".join(p.get_text(strip=True) for p in abstract.find_all("p")).replace(")", ") "),
"title": "Abstract",
"paper_title": paper_title,
"content_type": "abstract",
}
)
# Extract sections
sections = soup.find_all("section", class_="ltx_section")
for index, section in enumerate(sections):
section_title = section.find("h2", class_="ltx_title ltx_title_section")
section_title = section_title.get_text(strip=True) if section_title else f"Section {index + 1}"
section_content = section.get_text(strip=True).replace(")", ") ")
content_type = "body"
if index == 0:
content_type = "introduction"
elif index == len(sections) - 1:
content_type = "conclusion"
result.append(
{
"content": section_content,
"title": section_title,
"paper_title": paper_title,
"content_type": content_type,
}
)
for c in result:
c["arxiv_id"] = arxiv_id
return result
########################################################
### GET TEXTS FROM PDF & PARSE
########################################################
def get_pdf_text(arxiv_id):
url = "http://147.189.194.113:80/extract" # fix: currently down
try:
response = requests.get(url, params={"arxiv_id": arxiv_id})
response = response.json()
if "text" in response:
return response["text"]
return None
except Exception as e:
print(e)
return None
def get_content_type(section_type, section_count):
"""Determine the content type based on the section type and count"""
if section_type == "abstract":
return "abstract"
elif section_type == "introduction" or section_count == 1:
return "introduction"
elif section_type == "conclusion" or section_type == "references":
return section_type
else:
return "body"
def get_section_type(title):
"""Determine the section type based on the title"""
title_lower = title.lower()
if "abstract" in title_lower:
return "abstract"
elif "introduction" in title_lower:
return "introduction"
elif "conclusion" in title_lower:
return "conclusion"
elif "reference" in title_lower:
return "references"
else:
return "body"
def parse_markdown_content(md_content, arxiv_id):
"""
Parses markdown content to identify and extract sections based on headers.
"""
lines = md_content.split("\n")
parsed = []
current_section = None
content = []
paper_title = None
current_title = None
# identify sections based on headers
for line in lines:
if line.startswith("#"):
if paper_title is None:
paper_title = line.lstrip("#").strip()
continue
if content:
if current_title:
parsed.append(
{
"content": " ".join(content),
"title": current_title,
"paper_title": paper_title,
"content_type": get_content_type(current_section, len(parsed)),
"arxiv_id": arxiv_id,
}
)
content = []
current_title = line.lstrip("#").lstrip("#").lstrip()
if "bit" not in current_title:
current_title = (
current_title.lstrip("123456789")
.lstrip()
.lstrip(".")
.lstrip()
.lstrip("123456789")
.lstrip()
.lstrip(".")
.lstrip()
)
current_section = get_section_type(current_title)
else:
content.append(line)
# Add the last section
if content and current_title:
parsed.append(
{
"content": " ".join(content).replace(")", ") "),
"title": current_title,
"paper_title": paper_title,
"content_type": get_content_type(current_section, len(parsed)),
"arxiv_id": arxiv_id,
}
)
return parsed
########################################################
### HF UPLOAD
########################################################
def upload_to_hf(abstract_df, contents_df, processed_arxiv_ids):
repo_id = HF_REPO_ID
create_repo(
repo_id=repo_id,
token=os.environ.get("HF_TOKEN"),
private=True,
repo_type="dataset",
exist_ok=True,
)
# push id_to_abstract
abstract_ds = Dataset.from_pandas(abstract_df)
abstract_ds.push_to_hub(repo_id, "abstracts", token=os.environ.get("HF_TOKEN"))
# push arxiv_items
arxiv_ds = Dataset.from_pandas(contents_df)
arxiv_ds.push_to_hub(repo_id, "articles", token=os.environ.get("HF_TOKEN"))
# push processed_arxiv_ids
processed_arxiv_ids = [{"arxiv_id": arxiv_id} for arxiv_id in processed_arxiv_ids]
processed_arxiv_ids_ds = Dataset.from_list(processed_arxiv_ids)
processed_arxiv_ids_ds.push_to_hub(repo_id, "processed_arxiv_ids", token=os.environ.get("HF_TOKEN"))
########################################################
### MAIN
########################################################
def main():
items = get_zotero_items(debug=True)
print(f"# of items fetched from zotero: {len(items)}")
arxiv_items = get_arxiv_items(items)
print(f"# of arxiv papers: {len(arxiv_items)}")
# get already processed arxiv ids from HF
try:
existing_arxiv_ids = load_dataset(HF_REPO_ID, "processed_arxiv_ids")["train"]["arxiv_id"]
except Exception as e:
print(e)
try:
existing_arxiv_ids = srsly.read_json("data/processed_arxiv_ids.json")
except Exception as e:
print(e)
existing_arxiv_ids = []
existing_arxiv_ids = set(existing_arxiv_ids)
print(f"# of existing arxiv ids: {len(existing_arxiv_ids)}")
# new arxiv items
arxiv_items = [item for item in arxiv_items if item["arxiv_id"] not in existing_arxiv_ids]
arxiv_items = fetch_arxiv_htmls(arxiv_items)
print(f"# of new arxiv items: {len(arxiv_items)}")
processed_arxiv_ids = set()
for item in arxiv_items:
try:
item["contents"] = parse_html_content(item["raw_html"])
processed_arxiv_ids.add(item["arxiv_id"])
except Exception as e:
print(f"Failed to parse html for {item['arxiv_id']}: {e}")
item["contents"] = []
if len(item["contents"]) == 0:
print("Extracting from pdf...")
md_content = get_pdf_text(item["arxiv_id"]) # fix this
if md_content:
item["contents"] = parse_markdown_content(md_content, item["arxiv_id"])
processed_arxiv_ids.add(item["arxiv_id"])
else:
item["contents"] = []
# save contents ---
processed_arxiv_ids = list(processed_arxiv_ids)
print(f"# of processed arxiv ids: {len(processed_arxiv_ids)}")
# save abstracts ---
id_to_abstract = {}
for item in arxiv_items:
for entry in item["contents"]:
if entry["content_type"] == "abstract":
id_to_abstract[item["arxiv_id"]] = entry["content"]
break
print(f"# of abstracts: {len(id_to_abstract)}")
abstract_df = pd.Series(id_to_abstract).reset_index().rename(columns={"index": "arxiv_id", 0: "abstract"})
print(abstract_df.head())
# add to existing dataset
try:
old_abstract_df = load_dataset(HF_REPO_ID, "abstracts")["train"].to_pandas()
except Exception as e:
print(e)
old_abstract_df = pd.DataFrame(columns=abstract_df.columns)
print(old_abstract_df.head())
abstract_df = pd.concat([old_abstract_df, abstract_df]).reset_index(drop=True)
abstract_df = abstract_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True)
# contents
contents_df = pd.DataFrame(arxiv_items)
print(contents_df.head())
try:
old_contents_df = load_dataset(HF_REPO_ID, "articles")["train"].to_pandas()
except Exception as e:
print(e)
old_contents_df = pd.DataFrame(columns=contents_df.columns)
if len(old_contents_df) > 0:
print(old_contents_df.sample().T)
contents_df = pd.concat([old_contents_df, contents_df]).reset_index(drop=True)
contents_df = contents_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True)
# upload to hf
processed_arxiv_ids = list(set(processed_arxiv_ids + list(processed_arxiv_ids)))
upload_to_hf(abstract_df, contents_df, processed_arxiv_ids)
# save as local copy
os.makedirs("data", exist_ok=True)
abstract_df.to_parquet("data/abstracts.parquet")
contents_df.to_parquet("data/contents.parquet")
srsly.write_json("data/processed_arxiv_ids.json", processed_arxiv_ids)
def schedule_periodic_task():
"""
Schedule the main task to run at the user-defined frequency
"""
main() # run once initially
frequency = "daily" # TODO: env
if frequency == "hourly":
print("Scheduling tasks to run every hour at the top of the hour")
schedule.every().hour.at(":00").do(main)
elif frequency == "daily":
start_time = "10:00"
print("Scheduling tasks to run every day at: {start_time} UTC+00")
schedule.every().day.at(start_time).do(main)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
schedule_periodic_task()