Spaces:
Runtime error
Runtime error
| from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session | |
| import json | |
| import random | |
| import os | |
| import string | |
| import logging | |
| from datetime import datetime | |
| import os | |
| from huggingface_hub import login | |
| # Use the Hugging Face token from environment variables | |
| hf_token = os.environ.get("HF_TOKEN") | |
| if hf_token: | |
| login(token=hf_token) | |
| else: | |
| logger.error("HF_TOKEN not found in environment variables") | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("app.log"), | |
| logging.StreamHandler() | |
| ]) | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| app.config['SECRET_KEY'] = 'supersecretkey' # Change this to a random secret key | |
| # Directories for visualizations | |
| VISUALIZATION_DIRS_PLAN_OF_SQLS = { | |
| "TP": "htmls_POS/TP", | |
| "TN": "htmls_POS/TN", | |
| "FP": "htmls_POS/FP", | |
| "FN": "htmls_POS/FN" | |
| } | |
| VISUALIZATION_DIRS_CHAIN_OF_TABLE = { | |
| "TP": "htmls_COT/TP", | |
| "TN": "htmls_COT/TN", | |
| "FP": "htmls_COT/FP", | |
| "FN": "htmls_COT/FN" | |
| } | |
| VISUALIZATION_DIRS_NO_XAI = { | |
| "TP": "htmls_NO_XAI/TP", | |
| "TN": "htmls_NO_XAI/TN", | |
| "FP": "htmls_NO_XAI/FP", | |
| "FN": "htmls_NO_XAI/FN" | |
| } | |
| VISUALIZATION_DIRS_DATER = { | |
| "TP": "htmls_DATER/TP", | |
| "TN": "htmls_DATER/TN", | |
| "FP": "htmls_DATER/FP", | |
| "FN": "htmls_DATER/FN" | |
| } | |
| import json | |
| import os | |
| from datetime import datetime | |
| from huggingface_hub import HfApi | |
| from huggingface_hub import HfApi | |
| def save_session_data(username, data): | |
| try: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| file_name = f'{username}_{timestamp}_session.json' | |
| # Convert data to JSON string | |
| json_data = json.dumps(data, indent=4) | |
| # Create a temporary file | |
| temp_file_path = f"/tmp/{file_name}" | |
| with open(temp_file_path, 'w') as f: | |
| f.write(json_data) | |
| # Upload the file to a separate Hugging Face Space dedicated to data storage | |
| api = HfApi() | |
| api.upload_file( | |
| path_or_fileobj=temp_file_path, | |
| path_in_repo=f"session_data/{file_name}", | |
| repo_id="luulinh90s/Tabular-LLM-Study-Data", # Replace with your new data storage Space name | |
| repo_type="space", | |
| ) | |
| # Remove the temporary file | |
| os.remove(temp_file_path) | |
| logger.info(f"Session data saved for user {username} in Hugging Face Data Space") | |
| except Exception as e: | |
| logger.exception(f"Error saving session data for user {username}: {e}") | |
| from huggingface_hub import hf_hub_download, HfApi | |
| def load_session_data(username): | |
| try: | |
| # List files in the session_data directory of the data storage Space | |
| api = HfApi() | |
| files = api.list_repo_files(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space", path="session_data") | |
| # Filter and sort files for the user | |
| user_files = [f for f in files if f.startswith(f'session_data/{username}_') and f.endswith('_session.json')] | |
| if not user_files: | |
| logger.warning(f"No session data found for user {username}") | |
| return None | |
| # Get the most recent file | |
| latest_file = sorted(user_files, reverse=True)[0] | |
| # Download the file from the data storage Space | |
| file_path = hf_hub_download(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space", | |
| filename=latest_file) | |
| with open(file_path, 'r') as f: | |
| data = json.load(f) | |
| logger.info(f"Session data loaded for user {username} from Hugging Face Data Space") | |
| return data | |
| except Exception as e: | |
| logger.exception(f"Error loading session data for user {username}: {e}") | |
| return None | |
| def load_samples(methods): | |
| logger.info(f"Loading samples for methods: {methods}") | |
| samples = [] | |
| categories = ["TP", "TN", "FP", "FN"] | |
| method_dirs = [] | |
| for method in methods: | |
| if method == 'No-XAI': | |
| method_dirs.append('NO_XAI') | |
| elif method == 'Dater': | |
| method_dirs.append('DATER') | |
| elif method == 'Chain-of-Table': | |
| method_dirs.append('COT') | |
| elif method == 'Plan-of-SQLs': | |
| method_dirs.append('POS') | |
| for category in categories: | |
| dir_a = f'htmls_{method_dirs[0].upper()}/{category}' | |
| dir_b = f'htmls_{method_dirs[1].upper()}/{category}' | |
| files_a = set(os.listdir(dir_a)) | |
| files_b = set(os.listdir(dir_b)) | |
| matching_files = files_a & files_b | |
| for file in matching_files: | |
| samples.append({ | |
| 'category': category, | |
| 'file': file | |
| }) | |
| return samples | |
| def select_balanced_samples(samples): | |
| try: | |
| selected_samples = random.sample(samples, min(10, len(samples))) | |
| logger.info(f"Selected balanced samples: {len(selected_samples)}") | |
| return selected_samples | |
| except Exception as e: | |
| logger.exception("Error selecting balanced samples") | |
| return [] | |
| def generate_random_string(length=8): | |
| return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) | |
| def index(): | |
| logger.info("Rendering index page.") | |
| if request.method == 'POST': | |
| username = request.form.get('username') | |
| seed = request.form.get('seed') | |
| methods = request.form.get('method').split(',') | |
| if not username or not seed or len(methods) != 2: | |
| logger.error("Missing username, seed, or incorrect number of methods.") | |
| return "Please fill in all fields and select exactly two methods.", 400 | |
| try: | |
| seed = int(seed) | |
| random.seed(seed) | |
| all_samples = load_samples(methods) | |
| selected_samples = select_balanced_samples(all_samples) | |
| logger.info(f"Number of selected samples: {len(selected_samples)}") | |
| if len(selected_samples) == 0: | |
| logger.error("No samples were selected.") | |
| return "No samples were selected", 500 | |
| session_data = { | |
| 'username': username, | |
| 'seed': seed, | |
| 'methods': methods, | |
| 'selected_samples': selected_samples, | |
| 'current_index': 0, | |
| 'responses': [], | |
| 'start_time': datetime.now().isoformat() | |
| } | |
| save_session_data(username, session_data) | |
| logger.info(f"Session data initialized for user: {username}") | |
| return redirect(url_for('experiment', username=username)) | |
| except Exception as e: | |
| logger.exception(f"Error in index route: {e}") | |
| return "An error occurred", 500 | |
| return render_template('index.html') | |
| def experiment(username): | |
| try: | |
| session_data = load_session_data(username) | |
| if not session_data: | |
| logger.error(f"No session data found for user: {username}") | |
| return redirect(url_for('index')) | |
| selected_samples = session_data['selected_samples'] | |
| methods = session_data['methods'] | |
| current_index = session_data['current_index'] | |
| if current_index >= len(selected_samples): | |
| return redirect(url_for('completed', username=username)) | |
| sample = selected_samples[current_index] | |
| method_a, method_b = methods | |
| # Find matching files for both methods | |
| file_a = None | |
| file_b = None | |
| if method_a == 'No-XAI': | |
| method_a_dir = ('NO_XAI') | |
| elif method_a == 'Dater': | |
| method_a_dir = ('DATER') | |
| elif method_a == 'Chain-of-Table': | |
| method_a_dir = ('COT') | |
| elif method_a == 'Plan-of-SQLs': | |
| method_a_dir = ('POS') | |
| if method_b == 'No-XAI': | |
| method_b_dir = ('NO_XAI') | |
| elif method_b == 'Dater': | |
| method_b_dir = ('DATER') | |
| elif method_b == 'Chain-of-Table': | |
| method_b_dir = ('COT') | |
| elif method_b == 'Plan-of-SQLs': | |
| method_b_dir = ('POS') | |
| for category in ['TP', 'TN', 'FP', 'FN']: | |
| dir_a = f'htmls_{method_a_dir.upper()}/{category}' | |
| dir_b = f'htmls_{method_b_dir.upper()}/{category}' | |
| files_a = os.listdir(dir_a) | |
| files_b = os.listdir(dir_b) | |
| matching_files = set(files_a) & set(files_b) | |
| if matching_files: | |
| file_a = os.path.join(dir_a, next(iter(matching_files))) | |
| file_b = os.path.join(dir_b, next(iter(matching_files))) | |
| break | |
| if not file_a or not file_b: | |
| logger.error(f"Missing files for comparison at index {current_index}") | |
| session_data['current_index'] += 1 | |
| save_session_data(username, session_data) | |
| return redirect(url_for('experiment', username=username)) | |
| visualization_a = url_for('send_visualization', filename=file_a) | |
| visualization_b = url_for('send_visualization', filename=file_b) | |
| statement = """ | |
| You are given two explanations that describe the reasoning process of the Table QA model. | |
| Please analyze the explanations and determine which one provides a clearer and more accurate reasoning process. | |
| """ | |
| return render_template('experiment.html', | |
| sample_id=current_index, | |
| statement=statement, | |
| visualization_a=visualization_a, | |
| visualization_b=visualization_b, | |
| method_a=method_a, | |
| method_b=method_b, | |
| username=username) | |
| except Exception as e: | |
| logger.exception(f"An error occurred in the experiment route: {e}") | |
| return "An error occurred", 500 | |
| def get_visualization_dir(method): | |
| if method == "No-XAI": | |
| return 'htmls_NO_XAI' | |
| elif method == "Dater": | |
| return 'htmls_DATER' | |
| elif method == "Chain-of-Table": | |
| return 'htmls_COT' | |
| else: # Plan-of-SQLs | |
| return 'htmls_POS' | |
| def feedback(): | |
| try: | |
| username = request.form['username'] | |
| feedback = request.form['feedback'] | |
| session_data = load_session_data(username) | |
| if not session_data: | |
| logger.error(f"No session data found for user: {username}") | |
| return redirect(url_for('index')) | |
| # Store the feedback | |
| session_data['responses'].append({ | |
| 'sample_id': session_data['current_index'], | |
| 'preferred_method': feedback, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| # Move to the next sample | |
| session_data['current_index'] += 1 | |
| # Save updated session data | |
| save_session_data(username, session_data) | |
| logger.info(f"Feedback saved for user {username}, sample {session_data['current_index'] - 1}") | |
| if session_data['current_index'] >= len(session_data['selected_samples']): | |
| return redirect(url_for('completed', username=username)) | |
| return redirect(url_for('experiment', username=username)) | |
| except Exception as e: | |
| logger.exception(f"Error in feedback route: {e}") | |
| return "An error occurred", 500 | |
| def completed(username): | |
| try: | |
| session_data = load_session_data(username) | |
| if not session_data: | |
| logger.error(f"No session data found for user: {username}") | |
| return redirect(url_for('index')) | |
| session_data['end_time'] = datetime.now().isoformat() | |
| methods = session_data['methods'] | |
| responses = session_data['responses'] | |
| preferences = {method: 0 for method in methods} | |
| total_responses = len(responses) | |
| for response in responses: | |
| preferred_method = response['preferred_method'] | |
| preferences[preferred_method] += 1 | |
| for method in preferences: | |
| preferences[method] = round((preferences[method] / total_responses) * 100, 2) | |
| session_data['preferences'] = preferences | |
| save_session_data(username, session_data) | |
| return render_template('completed.html', preferences=preferences) | |
| except Exception as e: | |
| logger.exception(f"An error occurred in the completed route: {e}") | |
| return "An error occurred", 500 | |
| def send_visualization(filename): | |
| logger.info(f"Attempting to serve file: {filename}") | |
| # Ensure the path is safe and doesn't allow access to files outside the intended directory | |
| base_dir = os.getcwd() | |
| file_path = os.path.normpath(os.path.join(base_dir, filename)) | |
| if not file_path.startswith(base_dir): | |
| return "Access denied", 403 | |
| if not os.path.exists(file_path): | |
| return "File not found", 404 | |
| directory = os.path.dirname(file_path) | |
| file_name = os.path.basename(file_path) | |
| logger.info(f"Serving file from directory: {directory}, filename: {file_name}") | |
| return send_from_directory(directory, file_name) | |
| if __name__ == "__main__": | |
| os.makedirs('session_data', exist_ok=True) # Ensure the directory for session files exists | |
| app.run(host="0.0.0.0", port=7860, debug=True) |