Spaces:
Running
Running
import streamlit as st | |
import pandas as pd | |
import os | |
from huggingface_hub import HfApi, hf_hub_download | |
HF_REPO = "giobin/MAIA_human_assessment_annotations" | |
CSV_FILENAME = "user_selections.csv" | |
def assign_samples(csv_path): | |
df = pd.read_csv(csv_path) | |
group_1 = df[(df["pool_pos"] == 1) & (~df["question_category"].str.endswith("_B"))].head(50) | |
group_2 = df[(df["pool_pos"] == 2) & (~df["question_category"].str.endswith("_B"))].head(50) | |
group_3 = df[(df["pool_pos"] == 3) & (~df["question_category"].str.endswith("_B"))].head(50) | |
return { | |
"Bernardo": group_1, | |
"Alessandro": group_1, | |
"Alessio": group_1, | |
"Lenci": group_2, | |
"Lucia": group_2, | |
"Davide": group_2, | |
"Giovanni": group_3, | |
"Raffaella": group_3, | |
} | |
def load_existing_annotations(): | |
"""Load the existing annotations from the HF dataset.""" | |
try: | |
file_path = hf_hub_download(HF_REPO, CSV_FILENAME, repo_type="dataset", token=st.secrets["HF_TOKEN"]) | |
return pd.read_csv(file_path) | |
except Exception: | |
return pd.DataFrame(columns=["username", "id"]) # Return empty DataFrame if not found | |
# Load datasets | |
csv_file = "static/mc.csv" | |
assignments = assign_samples(csv_file) | |
existing_annotations = load_existing_annotations() | |
valid_users = list(assignments.keys()) | |
# Initialize session state | |
if "username" not in st.session_state: | |
st.session_state.username = None | |
if "index" not in st.session_state: | |
st.session_state.index = 0 | |
if "results" not in st.session_state: | |
st.session_state.results = [] | |
def update_name(): | |
"""Set username and reset index.""" | |
st.session_state.username = st.session_state.selected_user | |
st.session_state.index = 0 # Reset progress | |
if st.session_state.username is None: | |
with st.form("user_form"): | |
st.write("### Select Your Name") | |
selected_user = st.selectbox("Choose your name:", valid_users, key="selected_user") | |
submit_button = st.form_submit_button("Start", on_click=update_name) | |
st.stop() | |
# Get assigned dataset and remove already labeled samples | |
full_dataset = assignments[st.session_state.username].reset_index(drop=True) | |
user_labeled_ids = existing_annotations[existing_annotations["username"] == st.session_state.username]["id"].tolist() | |
dataset = full_dataset[~full_dataset["id"].isin(user_labeled_ids)].reset_index(drop=True) | |
# If all samples are labeled, stop execution | |
if dataset.empty: | |
st.write("### Great! You have completed your assignment. π") | |
st.stop() | |
def push_to_hf_hub(csv_path): | |
api = HfApi() | |
try: | |
api.create_repo(HF_REPO, repo_type="dataset", exist_ok=True, token=st.secrets["HF_TOKEN"]) | |
api.upload_file(path_or_fileobj=csv_path, path_in_repo=CSV_FILENAME, repo_id=HF_REPO, repo_type="dataset", token=st.secrets["HF_TOKEN"]) | |
print(f"Dataset updated: https://huggingface.co/datasets/{HF_REPO}") | |
except Exception as e: | |
print(f"Error pushing to HF: {e}") | |
def save_choice(choice_index): | |
sample = dataset.iloc[st.session_state.index] | |
st.session_state.results.append({ | |
"username": st.session_state.username, | |
"id": sample["id"], | |
"video_id": sample["video_id"], | |
"answer1": sample["answer1"], | |
"answer2": sample["answer2"], | |
"selected_answer": choice_index, | |
"target": sample["target"], | |
"not_enough_info": not_enough_info | |
}) | |
st.session_state.index += 1 | |
st.session_state.checkbox = False # reset the checkbox | |
if st.session_state.index >= len(dataset): # All remaining samples done | |
st.write("### Great! You have completed your assignment. π") | |
result_df = pd.DataFrame(st.session_state.results) | |
csv_path = "user_selections.csv" | |
if not existing_annotations.empty: | |
result_df = pd.concat([existing_annotations, result_df]).drop_duplicates(subset=["username", "id"], keep="last") | |
result_df.to_csv(csv_path, index=False) | |
push_to_hf_hub(csv_path) | |
st.stop() | |
return | |
# Select the current sample | |
sample = dataset.iloc[st.session_state.index] | |
# Title | |
st.markdown("<h1 style='text-align: center; font-size: 50px;'>MAIA Sample</h1>", unsafe_allow_html=True) | |
# Centered user name | |
st.markdown(f"<h3 style='text-align: center;'>User: {st.session_state.username}</h3>", unsafe_allow_html=True) | |
st.write("\n\n") # Add empty space | |
# Instructions | |
st.markdown(""" | |
### Instructions: | |
- Look at the video thumbnail, do not play it! | |
- Select the checkbox if you think so. | |
- Then choose one of the two answers. | |
""") | |
st.write("---") # Adds a horizontal divider for better separation | |
# Display Video | |
st.video(sample["video_url"]) | |
# Question Prompt | |
st.markdown("<h4 style='text-align: center; margin-top: 20px;'>Scegli la descrizione corretta tra A e B</h4>", unsafe_allow_html=True) | |
# Checkbox for uncertainty | |
st.markdown("<div style='text-align: center;'>", unsafe_allow_html=True) | |
not_enough_info = st.checkbox("The frame does not provide enough information to answer the question.", key='checkbox') | |
st.markdown("</div>", unsafe_allow_html=True) | |
st.write("\n") # Add spacing before buttons | |
# Buttons (Centered) | |
col1, col2 = st.columns(2) | |
with col1: | |
st.button(f"A: {sample.get('answer1', 'No answer1 available')}", on_click=lambda: save_choice(0)) | |
with col2: | |
st.button(f"B: {sample.get('answer2', 'No answer2 available')}", on_click=lambda: save_choice(1)) | |