Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import time | |
| import dotenv | |
| import pandas as pd | |
| import requests | |
| import schedule | |
| import srsly | |
| from bs4 import BeautifulSoup | |
| from datasets import Dataset, load_dataset | |
| from huggingface_hub import create_repo, login, whoami | |
| from retry import retry | |
| from tqdm.auto import tqdm | |
| dotenv.load_dotenv() | |
| login(token=os.environ.get("HF_TOKEN")) | |
| hf_user = whoami(os.environ.get("HF_TOKEN"))["name"] | |
| HF_REPO_ID = f"{hf_user}/zotero-answer-ai-articles" | |
| ######################################################## | |
| ### GET ZOTERO ITEMS | |
| ######################################################## | |
| def _fetch_one_zotero_batch(url, headers, params): | |
| """ | |
| Fetch articles from Zotero API | |
| """ | |
| response = requests.get(url, headers=headers, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| def get_zotero_items(debug=False): | |
| """ | |
| fetch items from zotero library | |
| """ | |
| GROUP_ID = os.getenv("GROUP_ID") | |
| API_KEY = os.getenv("API_KEY") | |
| BASE_URL = f"https://api.zotero.org/groups/{GROUP_ID}/items" | |
| LIMIT = 100 | |
| headers = {"Zotero-API-Key": API_KEY, "Content-Type": "application/json"} | |
| items = [] | |
| start = 0 | |
| i = 1 | |
| while True: | |
| i += 1 | |
| params = {"limit": LIMIT, "start": start} | |
| page_items = _fetch_one_zotero_batch(BASE_URL, headers, params) | |
| if not page_items: | |
| break | |
| items.extend(page_items) | |
| start += LIMIT | |
| print(f"# items fetched {len(items)}") | |
| if debug: | |
| if len(items) > 300: | |
| break | |
| return items | |
| ######################################################## | |
| ### EXTRACT ARXIV LINKS AND PDFs | |
| ######################################################## | |
| def get_arxiv_items(items): | |
| visited = set() | |
| arxiv_items = [] | |
| arxiv_pattern = re.compile(r"arxiv.org/abs/(\d+\.\d+)") | |
| for item in items: | |
| data = item.get("data", {}) | |
| attachments = item.get("links", {}).get("attachment", {}) | |
| arxiv_url = None | |
| pdf_url = None | |
| if "url" in data and "arxiv.org" in data["url"]: | |
| arxiv_match = arxiv_pattern.search(data["url"]) | |
| if arxiv_match: | |
| arxiv_url = data["url"] | |
| if attachments: | |
| pdf_url = attachments["href"] | |
| if arxiv_url: | |
| arxiv_id = arxiv_url.split("/")[-1] | |
| if arxiv_id in visited: | |
| continue | |
| arxiv_items.append( | |
| { | |
| "arxiv_id": arxiv_id, | |
| "arxiv_url": arxiv_url, | |
| "pdf_url": pdf_url, | |
| "added_by": item["meta"]["createdByUser"]["username"], | |
| "date_added": data.get("dateAdded", ""), | |
| } | |
| ) | |
| visited.add(arxiv_id) | |
| return arxiv_items | |
| def fetch_arxiv_html(arxiv_id): | |
| url = f"https://ar5iv.labs.arxiv.org/html/{arxiv_id.split('v')[0]}" | |
| response = requests.get(url) | |
| return response.text if response.status_code == 200 else None | |
| def fetch_arxiv_htmls(arxiv_items): | |
| for item in tqdm(arxiv_items): | |
| html = fetch_arxiv_html(item["arxiv_id"]) | |
| if html: | |
| item["raw_html"] = html | |
| else: | |
| print(f"failed to fetch html for {item['arxiv_id']}") | |
| item["raw_html"] = "Error" | |
| return arxiv_items | |
| ######################################################## | |
| ### PARSE CONTENT FROM ARXIV HTML # | |
| ######################################################## | |
| def parse_html_content(html): | |
| """ | |
| Parse content from arxiv html | |
| """ | |
| arxiv_id_match = re.search(r"\[(\d+\.\d+(v\d+)?)\]", html) | |
| arxiv_id = arxiv_id_match.group(1) if arxiv_id_match else None | |
| soup = BeautifulSoup(html, "html.parser") | |
| result = [] | |
| # Extract paper title | |
| try: | |
| paper_title = soup.find("h1", class_="ltx_title ltx_title_document").get_text(strip=True) | |
| except Exception: | |
| paper_title = soup.find("title").get_text(strip=True) | |
| paper_title = re.sub(r"^\[\d+\.\d+(v\d+)?\]\s*", "", paper_title) | |
| for math in soup.find_all("math"): | |
| math.decompose() | |
| for cite in soup.find_all("cite"): | |
| cite.decompose() | |
| # Extract abstract | |
| abstract = soup.find("div", class_="ltx_abstract") | |
| if abstract: | |
| result.append( | |
| { | |
| "content": " ".join(p.get_text(strip=True) for p in abstract.find_all("p")).replace(")", ") "), | |
| "title": "Abstract", | |
| "paper_title": paper_title, | |
| "content_type": "abstract", | |
| } | |
| ) | |
| # Extract sections | |
| sections = soup.find_all("section", class_="ltx_section") | |
| for index, section in enumerate(sections): | |
| section_title = section.find("h2", class_="ltx_title ltx_title_section") | |
| section_title = section_title.get_text(strip=True) if section_title else f"Section {index + 1}" | |
| section_content = section.get_text(strip=True).replace(")", ") ") | |
| content_type = "body" | |
| if index == 0: | |
| content_type = "introduction" | |
| elif index == len(sections) - 1: | |
| content_type = "conclusion" | |
| result.append( | |
| { | |
| "content": section_content, | |
| "title": section_title, | |
| "paper_title": paper_title, | |
| "content_type": content_type, | |
| } | |
| ) | |
| for c in result: | |
| c["arxiv_id"] = arxiv_id | |
| return result | |
| ######################################################## | |
| ### GET TEXTS FROM PDF & PARSE | |
| ######################################################## | |
| def get_pdf_text(arxiv_id): | |
| url = "http://147.189.194.113:80/extract" # fix: currently down | |
| try: | |
| response = requests.get(url, params={"arxiv_id": arxiv_id}) | |
| response = response.json() | |
| if "text" in response: | |
| return response["text"] | |
| return None | |
| except Exception as e: | |
| print(e) | |
| return None | |
| def get_content_type(section_type, section_count): | |
| """Determine the content type based on the section type and count""" | |
| if section_type == "abstract": | |
| return "abstract" | |
| elif section_type == "introduction" or section_count == 1: | |
| return "introduction" | |
| elif section_type == "conclusion" or section_type == "references": | |
| return section_type | |
| else: | |
| return "body" | |
| def get_section_type(title): | |
| """Determine the section type based on the title""" | |
| title_lower = title.lower() | |
| if "abstract" in title_lower: | |
| return "abstract" | |
| elif "introduction" in title_lower: | |
| return "introduction" | |
| elif "conclusion" in title_lower: | |
| return "conclusion" | |
| elif "reference" in title_lower: | |
| return "references" | |
| else: | |
| return "body" | |
| def parse_markdown_content(md_content, arxiv_id): | |
| """ | |
| Parses markdown content to identify and extract sections based on headers. | |
| """ | |
| lines = md_content.split("\n") | |
| parsed = [] | |
| current_section = None | |
| content = [] | |
| paper_title = None | |
| current_title = None | |
| # identify sections based on headers | |
| for line in lines: | |
| if line.startswith("#"): | |
| if paper_title is None: | |
| paper_title = line.lstrip("#").strip() | |
| continue | |
| if content: | |
| if current_title: | |
| parsed.append( | |
| { | |
| "content": " ".join(content), | |
| "title": current_title, | |
| "paper_title": paper_title, | |
| "content_type": get_content_type(current_section, len(parsed)), | |
| "arxiv_id": arxiv_id, | |
| } | |
| ) | |
| content = [] | |
| current_title = line.lstrip("#").lstrip("#").lstrip() | |
| if "bit" not in current_title: | |
| current_title = ( | |
| current_title.lstrip("123456789") | |
| .lstrip() | |
| .lstrip(".") | |
| .lstrip() | |
| .lstrip("123456789") | |
| .lstrip() | |
| .lstrip(".") | |
| .lstrip() | |
| ) | |
| current_section = get_section_type(current_title) | |
| else: | |
| content.append(line) | |
| # Add the last section | |
| if content and current_title: | |
| parsed.append( | |
| { | |
| "content": " ".join(content).replace(")", ") "), | |
| "title": current_title, | |
| "paper_title": paper_title, | |
| "content_type": get_content_type(current_section, len(parsed)), | |
| "arxiv_id": arxiv_id, | |
| } | |
| ) | |
| return parsed | |
| ######################################################## | |
| ### HF UPLOAD | |
| ######################################################## | |
| def upload_to_hf(abstract_df, contents_df, processed_arxiv_ids): | |
| repo_id = HF_REPO_ID | |
| create_repo( | |
| repo_id=repo_id, | |
| token=os.environ.get("HF_TOKEN"), | |
| private=True, | |
| repo_type="dataset", | |
| exist_ok=True, | |
| ) | |
| # push id_to_abstract | |
| abstract_ds = Dataset.from_pandas(abstract_df) | |
| abstract_ds.push_to_hub(repo_id, "abstracts", token=os.environ.get("HF_TOKEN")) | |
| # push arxiv_items | |
| arxiv_ds = Dataset.from_pandas(contents_df) | |
| arxiv_ds.push_to_hub(repo_id, "articles", token=os.environ.get("HF_TOKEN")) | |
| # push processed_arxiv_ids | |
| processed_arxiv_ids = [{"arxiv_id": arxiv_id} for arxiv_id in processed_arxiv_ids] | |
| processed_arxiv_ids_ds = Dataset.from_list(processed_arxiv_ids) | |
| processed_arxiv_ids_ds.push_to_hub(repo_id, "processed_arxiv_ids", token=os.environ.get("HF_TOKEN")) | |
| ######################################################## | |
| ### MAIN | |
| ######################################################## | |
| def main(): | |
| items = get_zotero_items(debug=True) | |
| print(f"# of items fetched from zotero: {len(items)}") | |
| arxiv_items = get_arxiv_items(items) | |
| print(f"# of arxiv papers: {len(arxiv_items)}") | |
| # get already processed arxiv ids from HF | |
| try: | |
| existing_arxiv_ids = load_dataset(HF_REPO_ID, "processed_arxiv_ids")["train"]["arxiv_id"] | |
| except Exception as e: | |
| print(e) | |
| try: | |
| existing_arxiv_ids = srsly.read_json("data/processed_arxiv_ids.json") | |
| except Exception as e: | |
| print(e) | |
| existing_arxiv_ids = [] | |
| existing_arxiv_ids = set(existing_arxiv_ids) | |
| print(f"# of existing arxiv ids: {len(existing_arxiv_ids)}") | |
| # new arxiv items | |
| arxiv_items = [item for item in arxiv_items if item["arxiv_id"] not in existing_arxiv_ids] | |
| arxiv_items = fetch_arxiv_htmls(arxiv_items) | |
| print(f"# of new arxiv items: {len(arxiv_items)}") | |
| processed_arxiv_ids = set() | |
| for item in arxiv_items: | |
| try: | |
| item["contents"] = parse_html_content(item["raw_html"]) | |
| processed_arxiv_ids.add(item["arxiv_id"]) | |
| except Exception as e: | |
| print(f"Failed to parse html for {item['arxiv_id']}: {e}") | |
| item["contents"] = [] | |
| if len(item["contents"]) == 0: | |
| print("Extracting from pdf...") | |
| md_content = get_pdf_text(item["arxiv_id"]) # fix this | |
| if md_content: | |
| item["contents"] = parse_markdown_content(md_content, item["arxiv_id"]) | |
| processed_arxiv_ids.add(item["arxiv_id"]) | |
| else: | |
| item["contents"] = [] | |
| # save contents --- | |
| processed_arxiv_ids = list(processed_arxiv_ids) | |
| print(f"# of processed arxiv ids: {len(processed_arxiv_ids)}") | |
| # save abstracts --- | |
| id_to_abstract = {} | |
| for item in arxiv_items: | |
| for entry in item["contents"]: | |
| if entry["content_type"] == "abstract": | |
| id_to_abstract[item["arxiv_id"]] = entry["content"] | |
| break | |
| print(f"# of abstracts: {len(id_to_abstract)}") | |
| abstract_df = pd.Series(id_to_abstract).reset_index().rename(columns={"index": "arxiv_id", 0: "abstract"}) | |
| print(abstract_df.head()) | |
| # add to existing dataset | |
| try: | |
| old_abstract_df = load_dataset(HF_REPO_ID, "abstracts")["train"].to_pandas() | |
| except Exception as e: | |
| print(e) | |
| old_abstract_df = pd.DataFrame(columns=abstract_df.columns) | |
| print(old_abstract_df.head()) | |
| abstract_df = pd.concat([old_abstract_df, abstract_df]).reset_index(drop=True) | |
| abstract_df = abstract_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True) | |
| # contents | |
| contents_df = pd.DataFrame(arxiv_items) | |
| print(contents_df.head()) | |
| try: | |
| old_contents_df = load_dataset(HF_REPO_ID, "articles")["train"].to_pandas() | |
| except Exception as e: | |
| print(e) | |
| old_contents_df = pd.DataFrame(columns=contents_df.columns) | |
| if len(old_contents_df) > 0: | |
| print(old_contents_df.sample().T) | |
| contents_df = pd.concat([old_contents_df, contents_df]).reset_index(drop=True) | |
| contents_df = contents_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True) | |
| # upload to hf | |
| processed_arxiv_ids = list(set(processed_arxiv_ids + list(processed_arxiv_ids))) | |
| upload_to_hf(abstract_df, contents_df, processed_arxiv_ids) | |
| # save as local copy | |
| os.makedirs("data", exist_ok=True) | |
| abstract_df.to_parquet("data/abstracts.parquet") | |
| contents_df.to_parquet("data/contents.parquet") | |
| srsly.write_json("data/processed_arxiv_ids.json", processed_arxiv_ids) | |
| def schedule_periodic_task(): | |
| """ | |
| Schedule the main task to run at the user-defined frequency | |
| """ | |
| main() # run once initially | |
| frequency = "daily" # TODO: env | |
| if frequency == "hourly": | |
| print("Scheduling tasks to run every hour at the top of the hour") | |
| schedule.every().hour.at(":00").do(main) | |
| elif frequency == "daily": | |
| start_time = "10:00" | |
| print("Scheduling tasks to run every day at: {start_time} UTC+00") | |
| schedule.every().day.at(start_time).do(main) | |
| while True: | |
| schedule.run_pending() | |
| time.sleep(1) | |
| if __name__ == "__main__": | |
| schedule_periodic_task() | |