from flask import Flask, render_template, request, redirect, url_for, send_from_directory, session import json import random import os import string import logging from datetime import datetime import os from huggingface_hub import login # Use the Hugging Face token from environment variables hf_token = os.environ.get("HF_TOKEN") if hf_token: login(token=hf_token) else: logger.error("HF_TOKEN not found in environment variables") # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("app.log"), logging.StreamHandler() ]) logger = logging.getLogger(__name__) app = Flask(__name__) app.config['SECRET_KEY'] = 'supersecretkey' # Change this to a random secret key # Directories for visualizations VISUALIZATION_DIRS_PLAN_OF_SQLS = { "TP": "htmls_POS/TP", "TN": "htmls_POS/TN", "FP": "htmls_POS/FP", "FN": "htmls_POS/FN" } VISUALIZATION_DIRS_CHAIN_OF_TABLE = { "TP": "htmls_COT/TP", "TN": "htmls_COT/TN", "FP": "htmls_COT/FP", "FN": "htmls_COT/FN" } VISUALIZATION_DIRS_NO_XAI = { "TP": "htmls_NO_XAI/TP", "TN": "htmls_NO_XAI/TN", "FP": "htmls_NO_XAI/FP", "FN": "htmls_NO_XAI/FN" } VISUALIZATION_DIRS_DATER = { "TP": "htmls_DATER/TP", "TN": "htmls_DATER/TN", "FP": "htmls_DATER/FP", "FN": "htmls_DATER/FN" } import json import os from datetime import datetime from huggingface_hub import HfApi from huggingface_hub import HfApi def save_session_data(username, data): try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") file_name = f'{username}_{timestamp}_session.json' # Convert data to JSON string json_data = json.dumps(data, indent=4) # Create a temporary file temp_file_path = f"/tmp/{file_name}" with open(temp_file_path, 'w') as f: f.write(json_data) # Upload the file to a separate Hugging Face Space dedicated to data storage api = HfApi() api.upload_file( path_or_fileobj=temp_file_path, path_in_repo=f"session_data/{file_name}", repo_id="luulinh90s/Tabular-LLM-Study-Data", # Replace with your new data storage Space name repo_type="space", ) # Remove the temporary file os.remove(temp_file_path) logger.info(f"Session data saved for user {username} in Hugging Face Data Space") except Exception as e: logger.exception(f"Error saving session data for user {username}: {e}") from huggingface_hub import hf_hub_download, HfApi def load_session_data(username): try: # List files in the session_data directory of the data storage Space api = HfApi() files = api.list_repo_files(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space", path="session_data") # Filter and sort files for the user user_files = [f for f in files if f.startswith(f'session_data/{username}_') and f.endswith('_session.json')] if not user_files: logger.warning(f"No session data found for user {username}") return None # Get the most recent file latest_file = sorted(user_files, reverse=True)[0] # Download the file from the data storage Space file_path = hf_hub_download(repo_id="luulinh90s/Tabular-LLM-Study-Data", repo_type="space", filename=latest_file) with open(file_path, 'r') as f: data = json.load(f) logger.info(f"Session data loaded for user {username} from Hugging Face Data Space") return data except Exception as e: logger.exception(f"Error loading session data for user {username}: {e}") return None def load_samples(methods): logger.info(f"Loading samples for methods: {methods}") samples = [] categories = ["TP", "TN", "FP", "FN"] method_dirs = [] for method in methods: if method == 'No-XAI': method_dirs.append('NO_XAI') elif method == 'Dater': method_dirs.append('DATER') elif method == 'Chain-of-Table': method_dirs.append('COT') elif method == 'Plan-of-SQLs': method_dirs.append('POS') for category in categories: dir_a = f'htmls_{method_dirs[0].upper()}/{category}' dir_b = f'htmls_{method_dirs[1].upper()}/{category}' files_a = set(os.listdir(dir_a)) files_b = set(os.listdir(dir_b)) matching_files = files_a & files_b for file in matching_files: samples.append({ 'category': category, 'file': file }) return samples def select_balanced_samples(samples): try: selected_samples = random.sample(samples, min(10, len(samples))) logger.info(f"Selected balanced samples: {len(selected_samples)}") return selected_samples except Exception as e: logger.exception("Error selecting balanced samples") return [] def generate_random_string(length=8): return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) @app.route('/', methods=['GET', 'POST']) def index(): logger.info("Rendering index page.") if request.method == 'POST': username = request.form.get('username') seed = request.form.get('seed') methods = request.form.get('method').split(',') if not username or not seed or len(methods) != 2: logger.error("Missing username, seed, or incorrect number of methods.") return "Please fill in all fields and select exactly two methods.", 400 try: seed = int(seed) random.seed(seed) all_samples = load_samples(methods) selected_samples = select_balanced_samples(all_samples) logger.info(f"Number of selected samples: {len(selected_samples)}") if len(selected_samples) == 0: logger.error("No samples were selected.") return "No samples were selected", 500 session_data = { 'username': username, 'seed': seed, 'methods': methods, 'selected_samples': selected_samples, 'current_index': 0, 'responses': [], 'start_time': datetime.now().isoformat() } save_session_data(username, session_data) logger.info(f"Session data initialized for user: {username}") return redirect(url_for('experiment', username=username)) except Exception as e: logger.exception(f"Error in index route: {e}") return "An error occurred", 500 return render_template('index.html') @app.route('/experiment/', methods=['GET', 'POST']) def experiment(username): try: session_data = load_session_data(username) if not session_data: logger.error(f"No session data found for user: {username}") return redirect(url_for('index')) selected_samples = session_data['selected_samples'] methods = session_data['methods'] current_index = session_data['current_index'] if current_index >= len(selected_samples): return redirect(url_for('completed', username=username)) sample = selected_samples[current_index] method_a, method_b = methods # Find matching files for both methods file_a = None file_b = None if method_a == 'No-XAI': method_a_dir = ('NO_XAI') elif method_a == 'Dater': method_a_dir = ('DATER') elif method_a == 'Chain-of-Table': method_a_dir = ('COT') elif method_a == 'Plan-of-SQLs': method_a_dir = ('POS') if method_b == 'No-XAI': method_b_dir = ('NO_XAI') elif method_b == 'Dater': method_b_dir = ('DATER') elif method_b == 'Chain-of-Table': method_b_dir = ('COT') elif method_b == 'Plan-of-SQLs': method_b_dir = ('POS') for category in ['TP', 'TN', 'FP', 'FN']: dir_a = f'htmls_{method_a_dir.upper()}/{category}' dir_b = f'htmls_{method_b_dir.upper()}/{category}' files_a = os.listdir(dir_a) files_b = os.listdir(dir_b) matching_files = set(files_a) & set(files_b) if matching_files: file_a = os.path.join(dir_a, next(iter(matching_files))) file_b = os.path.join(dir_b, next(iter(matching_files))) break if not file_a or not file_b: logger.error(f"Missing files for comparison at index {current_index}") session_data['current_index'] += 1 save_session_data(username, session_data) return redirect(url_for('experiment', username=username)) visualization_a = url_for('send_visualization', filename=file_a) visualization_b = url_for('send_visualization', filename=file_b) statement = """ You are given two explanations that describe the reasoning process of the Table QA model. Please analyze the explanations and determine which one provides a clearer and more accurate reasoning process. """ return render_template('experiment.html', sample_id=current_index, statement=statement, visualization_a=visualization_a, visualization_b=visualization_b, method_a=method_a, method_b=method_b, username=username) except Exception as e: logger.exception(f"An error occurred in the experiment route: {e}") return "An error occurred", 500 def get_visualization_dir(method): if method == "No-XAI": return 'htmls_NO_XAI' elif method == "Dater": return 'htmls_DATER' elif method == "Chain-of-Table": return 'htmls_COT' else: # Plan-of-SQLs return 'htmls_POS' @app.route('/feedback', methods=['POST']) def feedback(): try: username = request.form['username'] feedback = request.form['feedback'] session_data = load_session_data(username) if not session_data: logger.error(f"No session data found for user: {username}") return redirect(url_for('index')) # Store the feedback session_data['responses'].append({ 'sample_id': session_data['current_index'], 'preferred_method': feedback, 'timestamp': datetime.now().isoformat() }) # Move to the next sample session_data['current_index'] += 1 # Save updated session data save_session_data(username, session_data) logger.info(f"Feedback saved for user {username}, sample {session_data['current_index'] - 1}") if session_data['current_index'] >= len(session_data['selected_samples']): return redirect(url_for('completed', username=username)) return redirect(url_for('experiment', username=username)) except Exception as e: logger.exception(f"Error in feedback route: {e}") return "An error occurred", 500 @app.route('/completed/') def completed(username): try: session_data = load_session_data(username) if not session_data: logger.error(f"No session data found for user: {username}") return redirect(url_for('index')) session_data['end_time'] = datetime.now().isoformat() methods = session_data['methods'] responses = session_data['responses'] preferences = {method: 0 for method in methods} total_responses = len(responses) for response in responses: preferred_method = response['preferred_method'] preferences[preferred_method] += 1 for method in preferences: preferences[method] = round((preferences[method] / total_responses) * 100, 2) session_data['preferences'] = preferences save_session_data(username, session_data) return render_template('completed.html', preferences=preferences) except Exception as e: logger.exception(f"An error occurred in the completed route: {e}") return "An error occurred", 500 @app.route('/visualizations/') def send_visualization(filename): logger.info(f"Attempting to serve file: {filename}") # Ensure the path is safe and doesn't allow access to files outside the intended directory base_dir = os.getcwd() file_path = os.path.normpath(os.path.join(base_dir, filename)) if not file_path.startswith(base_dir): return "Access denied", 403 if not os.path.exists(file_path): return "File not found", 404 directory = os.path.dirname(file_path) file_name = os.path.basename(file_path) logger.info(f"Serving file from directory: {directory}, filename: {file_name}") return send_from_directory(directory, file_name) if __name__ == "__main__": os.makedirs('session_data', exist_ok=True) # Ensure the directory for session files exists app.run(host="0.0.0.0", port=7860, debug=True)