mariagrandury's picture
combine participant info
f8d385e
raw
history blame
15.6 kB
import json
import os
import time
from collections import defaultdict
from functools import lru_cache
import argilla as rg
import gradio as gr
import pandas as pd
from dotenv import load_dotenv
from fastapi import FastAPI
load_dotenv()
# Constants
DATA_DIR = "data"
PARTICIPANTS_CSV = os.path.join(DATA_DIR, "participants.csv")
LEADERBOARD_PERSONAL_CSV = "leaderboard_personal.csv"
# Column mappings for participants info
COLUMN_MAP = {
"gmail": "Dirección de correo electrónico",
"discord": "¿Cuál es tu nombre en Discord?",
"hf_username": "¿Cuál es tu nombre en el Hub de Hugging Face?",
"contact_email": "Email de contacto",
}
# Initialize Argilla client
try:
client = rg.Argilla(
api_url=os.getenv("ARGILLA_API_URL", ""),
api_key=os.getenv("ARGILLA_API_KEY", ""),
)
except Exception as e:
print(f"Error initializing Argilla client: {e}")
client = None
# Countries data
countries = {
"Argentina": {"iso": "ARG", "emoji": "🇦🇷"},
"Bolivia": {"iso": "BOL", "emoji": "🇧🇴"},
"Chile": {"iso": "CHL", "emoji": "🇨🇱"},
"Colombia": {"iso": "COL", "emoji": "🇨🇴"},
"Costa Rica": {"iso": "CRI", "emoji": "🇨🇷"},
"Cuba": {"iso": "CUB", "emoji": "🇨🇺"},
"Ecuador": {"iso": "ECU", "emoji": "🇪🇨"},
"El Salvador": {"iso": "SLV", "emoji": "🇸🇻"},
"España": {"iso": "ESP", "emoji": "🇪🇸"},
"Guatemala": {"iso": "GTM", "emoji": "🇬🇹"},
"Honduras": {"iso": "HND", "emoji": "🇭🇳"},
"México": {"iso": "MEX", "emoji": "🇲🇽"},
"Nicaragua": {"iso": "NIC", "emoji": "🇳🇮"},
"Panamá": {"iso": "PAN", "emoji": "🇵🇦"},
"Paraguay": {"iso": "PRY", "emoji": "🇵🇾"},
"Perú": {"iso": "PER", "emoji": "🇵🇪"},
"Puerto Rico": {"iso": "PRI", "emoji": "🇵🇷"},
"República Dominicana": {"iso": "DOM", "emoji": "🇩🇴"},
"Uruguay": {"iso": "URY", "emoji": "🇺🇾"},
"Venezuela": {"iso": "VEN", "emoji": "🇻🇪"},
}
@lru_cache(maxsize=1)
def get_user_mapping():
"""Get cached mapping of emails and hf_usernames to discord usernames."""
if not os.path.exists(PARTICIPANTS_CSV):
return {}, {}
try:
df = pd.read_csv(PARTICIPANTS_CSV)
email_to_discord = {}
hf_to_discord = {}
for _, row in df.iterrows():
discord = row.get(COLUMN_MAP["discord"], "")
if pd.notna(discord) and discord != "NA":
discord_lower = discord.lower()
# Map email to discord
gmail = row.get(COLUMN_MAP["gmail"], "")
if pd.notna(gmail):
email_to_discord[gmail.lower()] = discord_lower
# Map hf_username to discord
hf_username = row.get(COLUMN_MAP["hf_username"], "")
if pd.notna(hf_username):
hf_to_discord[hf_username.lower()] = discord_lower
return email_to_discord, hf_to_discord
except Exception as e:
print(f"Error loading {PARTICIPANTS_CSV}: {e}")
return {}, {}
def get_discord_username(identifier):
"""Get discord username from email or hf_username."""
email_to_discord, hf_to_discord = get_user_mapping()
if "@" in identifier:
return email_to_discord.get(identifier.lower(), identifier.split("@")[0])
return hf_to_discord.get(identifier.lower(), identifier)
def get_participant_info():
"""Get participant information from CSV."""
if not os.path.exists(PARTICIPANTS_CSV):
return {}
try:
df = pd.read_csv(PARTICIPANTS_CSV)
participant_info = {}
for _, row in df.iterrows():
discord_username = row.get(COLUMN_MAP["discord"], "")
if pd.notna(discord_username) and discord_username != "NA":
participant_info[discord_username.lower()] = {
"gmail": row.get(COLUMN_MAP["gmail"], ""),
"discord_username": discord_username,
"hf_username": row.get(COLUMN_MAP["hf_username"], ""),
"email": row.get(COLUMN_MAP["contact_email"], ""),
}
return participant_info
except Exception as e:
print(f"Error loading participant info: {e}")
return {}
def get_blend_es_data():
"""Get blend-es data from Argilla."""
if not client:
return []
data = []
for country, info in countries.items():
dataset_name = f"{info['emoji']} {country} - {info['iso']} - Responder"
try:
dataset = client.datasets(dataset_name)
records = list(dataset.records(with_responses=True))
user_counts = defaultdict(int)
user_mapping = {}
for record in records:
if "answer_1" in record.responses:
for answer in record.responses["answer_1"]:
if answer.user_id:
user_id = answer.user_id
user_counts[user_id] += 1
if user_id not in user_mapping:
try:
user = client.users(id=user_id)
user_mapping[user_id] = user.username
except:
user_mapping[user_id] = f"User-{user_id[:8]}"
for user_id, count in user_counts.items():
hf_username = user_mapping.get(user_id, f"User-{user_id[:8]}")
username = get_discord_username(hf_username)
data.append(
{"source": "blend-es", "username": username, "count": count}
)
except Exception as e:
print(f"Error processing {dataset_name}: {e}")
return data
def get_include_data():
"""Get include data from CSV."""
csv_path = os.path.join(DATA_DIR, "include.csv")
if not os.path.exists(csv_path):
return []
try:
df = pd.read_csv(csv_path)
username_col = "Nombre en Discord / username"
questions_col = "Total preguntas hackathon"
if username_col not in df.columns or questions_col not in df.columns:
return []
user_counts = defaultdict(int)
for _, row in df.iterrows():
username = row[username_col][1:] if pd.notna(row[username_col]) else ""
questions = row[questions_col] if pd.notna(row[questions_col]) else 0
if username and questions:
user_counts[username.lower()] += int(questions)
return [
{"source": "include", "username": username, "count": count}
for username, count in user_counts.items()
]
except Exception as e:
print(f"Error loading include data: {e}")
return []
def get_estereotipos_data():
"""Get estereotipos data from CSV."""
csv_path = os.path.join(DATA_DIR, "stereotypes.csv")
if not os.path.exists(csv_path):
return []
try:
df = pd.read_csv(csv_path)
if "token_id" not in df.columns or "count" not in df.columns:
return []
user_counts = defaultdict(int)
for _, row in df.iterrows():
mail = row.get("token_id", "")
count = row.get("count", 0)
if pd.notna(mail) and pd.notna(count):
user_counts[mail.lower()] += int(count)
return [
{
"source": "include",
"username": get_discord_username(mail),
"count": count,
}
for mail, count in user_counts.items()
]
except Exception as e:
print(f"Error loading estereotipos data: {e}")
return []
def get_arena_data():
"""Get arena data from JSON."""
json_path = os.path.join(DATA_DIR, "arena.json")
if not os.path.exists(json_path):
return []
try:
with open(json_path, "r", encoding="utf-8") as f:
arena_data = json.load(f)
user_counts = defaultdict(int)
for conversations in arena_data.values():
for conversation in conversations:
if username := conversation.get("username"):
user_counts[username.lower()] += 1
return [
{"source": "arena", "username": get_discord_username(mail), "count": count}
for mail, count in user_counts.items()
]
except Exception as e:
print(f"Error loading arena data: {e}")
return []
def consolidate_all_data():
"""Consolidate all data sources and create leaderboard."""
# Collect all data
all_data = (
get_blend_es_data()
+ get_include_data()
+ get_estereotipos_data()
+ get_arena_data()
)
# Get participant info
participant_info = get_participant_info()
# Aggregate user contributions
user_contributions = defaultdict(
lambda: {
"username": "",
"gmail": "",
"discord_username": "",
"hf_username": "",
"email": "",
"blend_es": 0,
"include": 0,
"estereotipos": 0,
"arena": 0,
}
)
for item in all_data:
source = item["source"]
username = item["username"]
count = item["count"]
user_key = username.lower()
if not user_contributions[user_key]["username"]:
user_contributions[user_key]["username"] = username
if username.lower() in participant_info:
info = participant_info[username.lower()]
user_contributions[user_key].update(
{
"gmail": info["gmail"],
"discord_username": info["discord_username"],
"hf_username": info["hf_username"],
"email": info["email"],
}
)
if source == "blend-es":
user_contributions[user_key]["blend_es"] += count
elif source == "include":
user_contributions[user_key]["include"] += count
elif source == "estereotipos":
user_contributions[user_key]["estereotipos"] += count
elif source == "arena":
user_contributions[user_key]["arena"] += count
# Create dataframes
full_rows = []
display_rows = []
for data in user_contributions.values():
# Full data for CSV
full_rows.append(
{
"Username": data["username"],
"Gmail": data["gmail"],
"Discord_Username": data["discord_username"],
"HF_Username": data["hf_username"],
"Email": data["email"],
"Arena": data["arena"],
"Blend-ES": data["blend_es"],
"Estereotipos": data["estereotipos"],
"INCLUDE": data["include"],
}
)
# Display data for UI (public)
display_rows.append(
{
"Username": data["username"],
"Arena": data["arena"],
"Blend-ES": data["blend_es"],
"Estereotipos": data["estereotipos"],
"INCLUDE": data["include"],
}
)
# Save full data to CSV
full_df = pd.DataFrame(full_rows)
if not full_df.empty:
full_df.sort_values("Arena", ascending=False, inplace=True)
full_df.to_csv(LEADERBOARD_PERSONAL_CSV, index=False, encoding="utf-8")
# Return display dataframe for UI
display_df = pd.DataFrame(display_rows)
if not display_df.empty:
display_df.sort_values("Arena", ascending=False, inplace=True)
return display_df
# FastAPI app
app = FastAPI()
# Global variables for caching
last_update_time = 0
cached_data = None
def create_leaderboard_ui():
"""Create the leaderboard UI with caching."""
global cached_data, last_update_time
current_time = time.time()
if cached_data is not None and current_time - last_update_time < 300:
df = cached_data
else:
df = consolidate_all_data()
cached_data = df
last_update_time = current_time
if not df.empty:
df = df.reset_index(drop=True)
df.index = df.index + 1
df = df.rename_axis("Rank").reset_index()
df_html = df.to_html(classes="leaderboard-table", border=0, index=False)
return f"""
<div style="margin: 20px 0;">
<p>Última Actualización: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(last_update_time))}</p>
<style>
.leaderboard-table {{
width: 100%;
border-collapse: collapse;
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
box-shadow: 0 4px 8px rgba(0,0,0,0.1);
border-radius: 8px;
overflow: hidden;
}}
.leaderboard-table th {{
background-color: #1a1a2e;
color: white;
font-weight: bold;
text-align: left;
padding: 14px;
border-bottom: 2px solid #16213e;
}}
.leaderboard-table td {{
padding: 12px 14px;
border-bottom: 1px solid #333;
background-color: #222;
color: #fff;
}}
.leaderboard-table tr:hover td {{
background-color: #2a2a3a;
}}
.leaderboard-table tr:nth-child(1) td:first-child {{
background-color: #ffd700;
color: #333;
font-weight: bold;
text-align: center;
border-right: 1px solid #333;
}}
.leaderboard-table tr:nth-child(2) td:first-child {{
background-color: #c0c0c0;
color: #333;
font-weight: bold;
text-align: center;
border-right: 1px solid #333;
}}
.leaderboard-table tr:nth-child(3) td:first-child {{
background-color: #cd7f32;
color: #333;
font-weight: bold;
text-align: center;
border-right: 1px solid #333;
}}
.leaderboard-table tr:nth-child(1) td:nth-child(2) {{
font-weight: bold;
color: #ffd700;
}}
.leaderboard-table tr:nth-child(2) td:nth-child(2) {{
font-weight: bold;
color: #c0c0c0;
}}
.leaderboard-table tr:nth-child(3) td:nth-child(2) {{
font-weight: bold;
color: #cd7f32;
}}
</style>
{df_html}
</div>
"""
def refresh_data():
"""Refresh the leaderboard data."""
global cached_data, last_update_time
cached_data = None
last_update_time = 0
return create_leaderboard_ui()
# Gradio interface
with gr.Blocks(theme=gr.themes.Default()) as demo:
with gr.Column(scale=1):
gr.Markdown("# 🏆 Leaderboard Personal Retos Hackathon 2025")
leaderboard_html = gr.HTML(create_leaderboard_ui)
refresh_btn = gr.Button("🔄 Actualizar Datos", variant="primary")
refresh_btn.click(fn=refresh_data, outputs=leaderboard_html)
gr.mount_gradio_app(app, demo, path="/")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)