Spaces:
Sleeping
Sleeping
| import base64 | |
| import os | |
| import shutil | |
| from collections import defaultdict | |
| from datetime import date, datetime, timedelta | |
| from io import BytesIO | |
| import dotenv | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from datasets import load_dataset | |
| from dateutil.parser import parse | |
| from dateutil.tz import tzutc | |
| from fasthtml.common import * | |
| from fh_matplotlib import matplotlib2fasthtml | |
| from huggingface_hub import login, whoami | |
| dotenv.load_dotenv() | |
| style = Style(""" | |
| .grid { margin-bottom: 1rem; } | |
| .card { display: flex; flex-direction: column; } | |
| .card img { margin-bottom: 0.5rem; } | |
| .card h5 { margin: 0; font-size: 0.9rem; line-height: 1.2; } | |
| .card a { color: inherit; text-decoration: none; } | |
| .card a:hover { text-decoration: underline; } | |
| """) | |
| # delete data folder | |
| if os.path.exists("data"): | |
| try: | |
| shutil.rmtree("data") | |
| except OSError as e: | |
| print("Error: %s : %s" % ("data", e.strerror)) | |
| app, rt = fast_app(html_style=(style,)) | |
| login(token=os.environ.get("HF_TOKEN")) | |
| hf_user = whoami(os.environ.get("HF_TOKEN"))["name"] | |
| HF_REPO_ID_TXT = f"{hf_user}/zotero-answer-ai-texts" | |
| HF_REPO_ID_IMG = f"{hf_user}/zotero-answer-ai-images" | |
| abstract_ds = load_dataset(HF_REPO_ID_TXT, "abstracts", split="train") | |
| article_ds = load_dataset(HF_REPO_ID_TXT, "articles", split="train") | |
| image_ds = load_dataset(HF_REPO_ID_IMG, "images_first_page", split="train") | |
| def parse_date(date_string): | |
| try: | |
| return parse(date_string).astimezone(tzutc()).date() | |
| except ValueError: | |
| return date.today() | |
| def get_week_start(date_obj): | |
| return date_obj - timedelta(days=date_obj.weekday()) | |
| week2articles = defaultdict(list) | |
| for article in article_ds: | |
| date_added = parse_date(article["date_added"]) | |
| week_start = get_week_start(date_added) | |
| week2articles[week_start].append(article["arxiv_id"]) | |
| weeks = sorted(week2articles.keys(), reverse=True) | |
| arxiv2article = {article["arxiv_id"]: article for article in article_ds} | |
| arxiv2abstract = {abstract["arxiv_id"]: abstract for abstract in abstract_ds} | |
| arxiv2image = {image["arxiv_id"]: image for image in image_ds} | |
| def get_article_details(arxiv_id): | |
| article = arxiv2article.get(arxiv_id, {}) | |
| abstract = arxiv2abstract.get(arxiv_id, {}) | |
| image = arxiv2image.get(arxiv_id, {}) | |
| return article, abstract, image | |
| def generate_week_content(current_week): | |
| week_index = weeks.index(current_week) | |
| prev_week = weeks[week_index + 1] if week_index < len(weeks) - 1 else None | |
| next_week = weeks[week_index - 1] if week_index > 0 else None | |
| nav_buttons = Div( | |
| Button( | |
| "β Previous Week", | |
| hx_get=f"/week/{prev_week}" if prev_week else "#", | |
| hx_target="#content", | |
| hx_swap="innerHTML", | |
| disabled=not prev_week, | |
| ), | |
| Button( | |
| "Next Week β", | |
| hx_get=f"/week/{next_week}" if next_week else "#", | |
| hx_target="#content", | |
| hx_swap="innerHTML", | |
| disabled=not next_week, | |
| ), | |
| A("View Stats", href="/stats", cls="button"), | |
| ) | |
| articles = week2articles[current_week] | |
| article_cards = [] | |
| for arxiv_id in articles: | |
| article, abstract, image = get_article_details(arxiv_id) | |
| article_title = article["contents"][0].get("paper_title", "article") if article["contents"] else "article" | |
| card_content = [ | |
| H5( | |
| A( | |
| article_title, | |
| href=f"https://arxiv.org/abs/{arxiv_id}", | |
| target="_blank", | |
| ) | |
| ) | |
| ] | |
| if image: | |
| pil_image = image["image"] # image[0]["image"] | |
| pil_image.thumbnail((500, 500)) | |
| img_byte_arr = BytesIO() | |
| pil_image.save(img_byte_arr, format="JPEG") | |
| img_byte_arr = img_byte_arr.getvalue() | |
| image_url = f"data:image/jpeg;base64,{base64.b64encode(img_byte_arr).decode('utf-8')}" | |
| card_content.insert( | |
| 0, | |
| Img( | |
| src=image_url, | |
| alt="Article image", | |
| style="max-width: 100%; height: auto; margin-bottom: 15px;", | |
| ), | |
| ) | |
| article_cards.append(Card(*card_content, cls="mb-4")) | |
| grid = Grid( | |
| *article_cards, | |
| style="display: grid; grid-template-columns: repeat(3, 1fr); gap: 1rem;", | |
| ) | |
| week_end = current_week + timedelta(days=6) | |
| return Div( | |
| nav_buttons, | |
| Br(), | |
| H5(f"{current_week.strftime('%B %d')} - {week_end.strftime('%B %d, %Y')} ({len(articles)} articles)"), | |
| Br(), | |
| grid, | |
| nav_buttons, | |
| id="content", | |
| ) | |
| def get(): | |
| return Titled("AnswerAI Zotero Weekly", generate_week_content(weeks[0])) | |
| def get(date: str): | |
| try: | |
| current_week = datetime.strptime(date, "%Y-%m-%d").date() | |
| return generate_week_content(current_week) | |
| except Exception as e: | |
| return Div(f"Error displaying articles: {str(e)}") | |
| async def get(): | |
| def generate_chart(): | |
| end_date = max(weeks) | |
| start_date = end_date - timedelta(weeks=11) | |
| dates = [] | |
| counts = [] | |
| current_date = start_date | |
| while current_date <= end_date: | |
| count = len(week2articles[current_date]) | |
| date_str = current_date.strftime("%d-%B-%Y") | |
| dates.append(date_str) | |
| counts.append(count) | |
| current_date += timedelta(weeks=1) | |
| plt.figure(figsize=(12, 6)) | |
| sns.set_style("darkgrid") | |
| # sns.set_palette("deep") | |
| ax = sns.barplot(x=dates, y=counts) | |
| plt.title("Papers per Week (Last 12 Weeks)", fontsize=16, fontweight="bold") | |
| plt.xlabel("Week", fontsize=12) | |
| plt.ylabel("Number of Papers", fontsize=12) | |
| # Rotate and align the tick labels so they look better | |
| plt.xticks(rotation=45, ha="right") | |
| # Use a tight layout to prevent the labels from being cut off | |
| plt.tight_layout() | |
| # Add value labels on top of each bar | |
| for i, v in enumerate(counts): | |
| ax.text(i, v + 0.5, str(v), ha="center", va="bottom") | |
| # Increase y-axis limit slightly to accommodate the value labels | |
| plt.ylim(0, max(counts) * 1.1) | |
| def generate_contributions_chart(): | |
| article_df = article_ds.data.to_pandas() | |
| added_by_df = article_df.groupby("added_by").size().reset_index(name="count") | |
| added_by_df = added_by_df.sort_values("count", ascending=False) # Ascending for bottom-to-top order | |
| plt.figure(figsize=(12, 8)) | |
| sns.set_style("darkgrid") | |
| sns.set_palette("deep") | |
| ax = sns.barplot(x="count", y="added_by", data=added_by_df) | |
| plt.title("Upload Counts", fontsize=16, fontweight="bold") | |
| plt.xlabel("Number of Articles Added", fontsize=12) | |
| plt.ylabel("User", fontsize=12) | |
| # Add value labels to the end of each bar | |
| for i, v in enumerate(added_by_df["count"]): | |
| ax.text(v + 0.5, i, str(v), va="center") | |
| # Adjust x-axis to make room for labels | |
| plt.xlim(0, max(added_by_df["count"]) * 1.1) | |
| plt.tight_layout() | |
| # chart = Div(generate_chart(), id="chart") | |
| bar_chart = Div(generate_chart(), id="bar-chart") | |
| pie_chart = Div(generate_contributions_chart(), id="pie-chart") | |
| # add contributions | |
| article_df = article_ds.data.to_pandas() | |
| added_by_df = article_df.groupby("added_by").size().reset_index(name="count") | |
| added_by_df = added_by_df.sort_values("count", ascending=False) | |
| return Titled( | |
| "AnswerAI Zotero Stats", | |
| H5("Papers per Week (Last 12 Weeks)"), | |
| bar_chart, | |
| Br(), | |
| H5("Contributions by User"), | |
| pie_chart, | |
| Br(), | |
| A("Back to Weekly View", href="/", cls="button"), | |
| ) | |
| # serve() | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) | |