Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| from datetime import datetime | |
| import os | |
| from huggingface_hub import HfApi, upload_file, list_repo_files, hf_hub_download | |
| # Configuration for Hugging Face Repository | |
| REPO_ID = "MarcosRodrigo/Breakfast-Poll" | |
| HISTORY_DIR = "history" | |
| TEMP_FILE = "current_selections.csv" | |
| # Hugging Face API (requires a token with write access) | |
| hf_token = st.secrets["HF_TOKEN"] | |
| api = HfApi() | |
| # Initialize all required session state variables | |
| if "users" not in st.session_state: | |
| st.session_state.users = [] | |
| if "current_selections" not in st.session_state: | |
| st.session_state.current_selections = [] | |
| if "step" not in st.session_state: | |
| st.session_state.step = 1 | |
| if "history" not in st.session_state: | |
| st.session_state.history = [] | |
| # Load temporary selections from the shared file | |
| def load_current_selections(): | |
| if os.path.exists(TEMP_FILE): | |
| return pd.read_csv(TEMP_FILE) | |
| else: | |
| return pd.DataFrame(columns=["Name", "Drinks", "Food"]) | |
| # Save current user selections to the shared CSV file without overwriting previous data | |
| def save_current_selection_to_file(current_selections): | |
| current_selections["Drinks"] = current_selections["Drinks"].apply(lambda x: ", ".join(x) if isinstance(x, list) else x) | |
| current_selections["Food"] = current_selections["Food"].apply(lambda x: ", ".join(x) if isinstance(x, list) else x) | |
| if os.path.exists(TEMP_FILE): | |
| existing_selections = pd.read_csv(TEMP_FILE) | |
| combined_selections = pd.concat([existing_selections, current_selections]).drop_duplicates() | |
| else: | |
| combined_selections = current_selections | |
| combined_selections.to_csv(TEMP_FILE, index=False) | |
| # Upload the shared file to Hugging Face repository for persistence | |
| def upload_temp_file_to_repo(): | |
| if os.path.exists(TEMP_FILE): | |
| upload_file( | |
| path_or_fileobj=TEMP_FILE, | |
| path_in_repo=TEMP_FILE, | |
| repo_id=REPO_ID, | |
| token=hf_token, | |
| repo_type="space" | |
| ) | |
| # Delete a file from the repository (e.g., `current_selections.csv`) | |
| def delete_file_from_repo(filename): | |
| api.delete_file( | |
| path_in_repo=filename, | |
| repo_id=REPO_ID, | |
| token=hf_token, | |
| repo_type="space" | |
| ) | |
| # Download the shared file from the repository to ensure persistence and real-time updates | |
| def download_temp_file_from_repo(): | |
| try: | |
| hf_hub_download(repo_id=REPO_ID, filename=TEMP_FILE, repo_type="space", token=hf_token, local_dir=".") | |
| except Exception: | |
| pd.DataFrame(columns=["Name", "Drinks", "Food"]).to_csv(TEMP_FILE, index=False) | |
| # Load history from the repository | |
| def load_history(): | |
| history = [] | |
| files_in_repo = list_repo_files(REPO_ID, token=hf_token, repo_type="space") | |
| history_files = [f for f in files_in_repo if f.startswith(f"{HISTORY_DIR}/") and f.endswith(".txt")] | |
| for file in history_files: | |
| local_filepath = hf_hub_download(repo_id=REPO_ID, filename=file, token=hf_token, repo_type="space") | |
| summary_df = pd.read_csv(local_filepath) | |
| date = file.split("/")[-1].split(".txt")[0] | |
| history.append({"Date": date, "Summary": summary_df}) | |
| return history | |
| # Save the current summary to a text file in the history directory | |
| def save_summary_to_history(): | |
| timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| history_filename = f"{HISTORY_DIR}/{timestamp}.txt" | |
| if not os.path.exists(HISTORY_DIR): | |
| os.makedirs(HISTORY_DIR) | |
| if os.path.exists(TEMP_FILE): | |
| summary_df = pd.read_csv(TEMP_FILE) | |
| summary_df.to_csv(history_filename, index=False) | |
| upload_file(path_or_fileobj=history_filename, path_in_repo=history_filename, repo_id=REPO_ID, token=hf_token, repo_type="space") | |
| return timestamp | |
| # Load persistent history and temporary selections on app start | |
| if "history" not in st.session_state: | |
| download_temp_file_from_repo() | |
| st.session_state.history = load_history() | |
| st.session_state.current_selections = load_current_selections().to_dict(orient="records") | |
| # Sidebar for navigating through different views | |
| menu = st.sidebar.selectbox("Select View", ["Poll", "Current", "History"]) | |
| # Function to reset the current selections after submission | |
| def reset_selections(): | |
| st.session_state.users = [] | |
| st.session_state.current_selections = [] | |
| # Poll view with four consecutive steps | |
| if menu == "Poll": | |
| st.title("Breakfast Poll Application") | |
| # Step 1: User's Name | |
| st.header("Step 1: Enter your name") | |
| name = st.text_input("Name:") | |
| if st.button("Next", key="step1_next") and name: | |
| st.session_state.users.append(name) | |
| st.session_state.step = 2 # Set the next step to be visible | |
| # Show Step 2 only if Step 1 is completed | |
| if st.session_state.step >= 2: | |
| st.header("Step 2: Select your drink(s)") | |
| drinks_options = [ | |
| "Café con leche", "Colacao", "Descafeinado con leche", "Cortado", | |
| "Aguasusia", "Aguasusia susia", "Café descafeinado con leche desnatada", | |
| "Italiano", "Café con soja", "Té", "Manzanilla", "Nada" | |
| ] | |
| selected_drinks = st.multiselect("Choose your drinks:", drinks_options) | |
| if st.button("Next", key="step2_next") and selected_drinks: | |
| st.session_state.current_selections.append({"Name": st.session_state.users[-1], "Drinks": selected_drinks}) | |
| st.session_state.step = 3 # Set the next step to be visible | |
| # Show Step 3 only if Step 2 is completed | |
| if st.session_state.step >= 3: | |
| st.header("Step 3: Select your food(s)") | |
| food_options = [ | |
| "Barrita con aceite", "Barrita con tomate", "Palmera de chocolate", | |
| "Palmera de chocolate blanco", "Yogurt", "Pincho de tortilla", "Nada" | |
| ] | |
| selected_food = st.multiselect("Choose your food:", food_options) | |
| if st.button("Save Selections", key="save_selections") and selected_food: | |
| st.session_state.current_selections[-1]["Food"] = selected_food | |
| df = pd.DataFrame(st.session_state.current_selections) | |
| save_current_selection_to_file(df) | |
| upload_temp_file_to_repo() | |
| st.success(f"Selections saved for {st.session_state.users[-1]}!") | |
| st.session_state.step = 1 # Reset to step 1 for the next user | |
| # "Current" view to display the current summary of all users' selections and submit to history | |
| elif menu == "Current": | |
| st.title("Current Selections of All Users") | |
| if st.button("Reload Selections"): | |
| download_temp_file_from_repo() | |
| current_df = load_current_selections() | |
| st.table(current_df) | |
| if st.button("Submit Summary to History"): | |
| timestamp = save_summary_to_history() | |
| st.success(f"Summary saved to history at {timestamp}") | |
| st.session_state.history = load_history() | |
| # Clear local and remote current selections | |
| if os.path.exists(TEMP_FILE): | |
| os.remove(TEMP_FILE) | |
| delete_file_from_repo(TEMP_FILE) # Delete the file from the remote repo | |
| # Create an empty CSV to replace the deleted one | |
| pd.DataFrame(columns=["Name", "Drinks", "Food"]).to_csv(TEMP_FILE, index=False) | |
| upload_temp_file_to_repo() | |
| # st.experimental_set_query_params(step="reset") | |
| # History view to check past summaries | |
| elif menu == "History": | |
| st.title("Breakfast Poll History") | |
| # Reload history if it's not already loaded | |
| if not st.session_state.history: | |
| st.session_state.history = load_history() | |
| if st.session_state.history: | |
| # Display history in reverse chronological order | |
| for record in reversed(st.session_state.history): | |
| st.subheader(f"Date: {record['Date']}") | |
| st.table(record["Summary"]) | |
| else: | |
| st.write("No history records found.") | |
| # "Current" view to display the current summary of all users' selections and submit to history | |
| elif menu == "Current": | |
| st.title("Current Selections of All Users") | |
| if st.button("Reload Selections"): | |
| download_temp_file_from_repo() | |
| current_df = load_current_selections() | |
| st.table(current_df) | |
| if st.button("Submit Summary to History"): | |
| timestamp = save_summary_to_history() | |
| st.success(f"Summary saved to history at {timestamp}") | |
| st.session_state.history = load_history() | |
| # Clear local and remote current selections | |
| if os.path.exists(TEMP_FILE): | |
| os.remove(TEMP_FILE) | |
| delete_file_from_repo(TEMP_FILE) # Delete the file from the remote repo | |
| # Create an empty CSV to replace the deleted one | |
| pd.DataFrame(columns=["Name", "Drinks", "Food"]).to_csv(TEMP_FILE, index=False) | |
| upload_temp_file_to_repo() | |
| # st.experimental_set_query_params(step="reset") | |
| # History view to check past summaries | |
| elif menu == "History": | |
| st.title("Breakfast Poll History") | |
| # Reload history if it's not already loaded | |
| if not st.session_state.history: | |
| st.session_state.history = load_history() | |
| if st.session_state.history: | |
| # Display history in reverse chronological order | |
| for record in reversed(st.session_state.history): | |
| st.subheader(f"Date: {record['Date']}") | |
| st.table(record["Summary"]) | |
| else: | |
| st.write("No history records found.") | |